/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws.dynamodb;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws.dynamodb.AwsClientsProvider;
import org.apache.beam.sdk.io.aws.dynamodb.AwsClientsProviderMock;
import org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO;
import org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIOTestHelper;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

@Ignore(value="[BEAM-7794] DynamoDBIOTest is blocking forever")
public class DynamoDBIOTest
implements Serializable {
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(DynamoDBIO.class);
    private static final String tableName = "TaskA";
    private static final int numOfItems = 10;
    private static List<Map<String, AttributeValue>> expected;
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @BeforeClass
    public static void setup() {
        DynamoDBIOTestHelper.startServerClient();
        DynamoDBIOTestHelper.createTestTable(tableName);
        expected = DynamoDBIOTestHelper.generateTestData(tableName, 10);
    }

    @AfterClass
    public static void destroy() {
        DynamoDBIOTestHelper.stopServerClient(tableName);
    }

    @Test
    public void testReadScanResult() {
        PCollection actual = (PCollection)this.pipeline.apply((PTransform)DynamoDBIO.read().withAwsClientsProvider((AwsClientsProvider)AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())).withScanRequestFn((SerializableFunction & Serializable)input -> new ScanRequest(tableName).withTotalSegments(Integer.valueOf(1))).items());
        PAssert.that((PCollection)actual).containsInAnyOrder((Object[])new List[]{expected});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testMissingScanRequestFn() {
        this.thrown.expectMessage("withScanRequestFn() is required");
        this.pipeline.apply((PTransform)DynamoDBIO.read().withAwsClientsProvider((AwsClientsProvider)AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail((String)"withScanRequestFn() is required");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"withScanRequestFn() is required", (Object)ex.getMessage());
        }
    }

    @Test
    public void testMissingAwsClientsProvider() {
        this.thrown.expectMessage("withAwsClientsProvider() is required");
        this.pipeline.apply((PTransform)DynamoDBIO.read().withScanRequestFn((SerializableFunction & Serializable)input -> new ScanRequest(tableName).withTotalSegments(Integer.valueOf(3))));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail((String)"withAwsClientsProvider() is required");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"withAwsClientsProvider() is required", (Object)ex.getMessage());
        }
    }

    @Test
    public void testMissingTotalSegments() {
        this.thrown.expectMessage("TotalSegments is required with withScanRequestFn()");
        this.pipeline.apply((PTransform)DynamoDBIO.read().withScanRequestFn((SerializableFunction & Serializable)input -> new ScanRequest(tableName)).withAwsClientsProvider((AwsClientsProvider)AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail((String)"TotalSegments is required with withScanRequestFn()");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"TotalSegments is required with withScanRequestFn()", (Object)ex.getMessage());
        }
    }

    @Test
    public void testNegativeTotalSegments() {
        this.thrown.expectMessage("TotalSegments is required with withScanRequestFn() and greater zero");
        this.pipeline.apply((PTransform)DynamoDBIO.read().withScanRequestFn((SerializableFunction & Serializable)input -> new ScanRequest(tableName).withTotalSegments(Integer.valueOf(-1))).withAwsClientsProvider((AwsClientsProvider)AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail((String)"withTotalSegments() is expected and greater than zero");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"TotalSegments is required with withScanRequestFn() and greater zero", (Object)ex.getMessage());
        }
    }

    @Test
    public void testWriteDataToDynamo() {
        List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(10);
        PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(writeRequests))).apply((PTransform)DynamoDBIO.write().withWriteRequestMapperFn((SerializableFunction & Serializable)writeRequest -> KV.of((Object)tableName, (Object)writeRequest)).withRetryConfiguration(DynamoDBIO.RetryConfiguration.create((int)5, (Duration)Duration.standardMinutes((long)1L))).withAwsClientsProvider((AwsClientsProvider)AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        PCollection publishedResultsSize = (PCollection)output.apply(Count.globally());
        PAssert.that((PCollection)publishedResultsSize).containsInAnyOrder((Object[])new Long[]{0L});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testRetries() throws Throwable {
        this.thrown.expectMessage("Error writing to DynamoDB");
        List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(10);
        AmazonDynamoDB amazonDynamoDBMock = (AmazonDynamoDB)Mockito.mock(AmazonDynamoDB.class);
        Mockito.when((Object)amazonDynamoDBMock.batchWriteItem((BatchWriteItemRequest)Mockito.any(BatchWriteItemRequest.class))).thenThrow(new Throwable[]{new AmazonDynamoDBException("Service unavailable")});
        ((PCollection)this.pipeline.apply((PTransform)Create.of(writeRequests))).apply((PTransform)DynamoDBIO.write().withWriteRequestMapperFn((SerializableFunction & Serializable)writeRequest -> KV.of((Object)tableName, (Object)writeRequest)).withRetryConfiguration(DynamoDBIO.RetryConfiguration.create((int)4, (Duration)Duration.standardSeconds((long)10L))).withAwsClientsProvider((AwsClientsProvider)AwsClientsProviderMock.of(amazonDynamoDBMock)));
        try {
            this.pipeline.run().waitUntilFinish();
        }
        catch (Pipeline.PipelineExecutionException e) {
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 1));
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 2));
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 3));
            throw e.getCause();
        }
        Assert.fail((String)"Pipeline is expected to fail because we were unable to write to DynamoDB.");
    }
}

