/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.dynamodb;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.dynamodb.DynamoDBIO;
import org.apache.beam.sdk.io.aws2.dynamodb.DynamoDBIOTestHelper;
import org.apache.beam.sdk.io.aws2.dynamodb.DynamoDbClientProvider;
import org.apache.beam.sdk.io.aws2.dynamodb.DynamoDbClientProviderMock;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;

public class DynamoDBIOTest
implements Serializable {
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(DynamoDBIO.class);
    private static final String tableName = "TaskA";
    private static final int numOfItems = 10;
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @BeforeClass
    public static void setup() {
        DynamoDBIOTestHelper.startServerClient();
    }

    @AfterClass
    public static void destroy() {
        DynamoDBIOTestHelper.stopServerClient(tableName);
    }

    @Before
    public void createTable() {
        DynamoDBIOTestHelper.createTestTable(tableName);
    }

    @After
    public void cleanTable() {
        DynamoDBIOTestHelper.deleteTestTable(tableName);
    }

    @Test
    public void testReaderOneSegment() {
        List<Map<String, AttributeValue>> expected = DynamoDBIOTestHelper.generateTestData(tableName, 10);
        PCollection actual = (PCollection)this.pipeline.apply((PTransform)DynamoDBIO.read().withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())).withScanRequestFn((SerializableFunction & Serializable)input -> (ScanRequest)ScanRequest.builder().tableName(tableName).totalSegments(Integer.valueOf(1)).build()).items());
        PAssert.that((PCollection)actual).containsInAnyOrder((Object[])new List[]{expected});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testReaderThreeSegments() {
        TupleTag outputTag = new TupleTag();
        PCollectionTuple writeOutput = (PCollectionTuple)((PCollection)this.pipeline.apply((PTransform)DynamoDBIO.read().withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())).withScanRequestFn((SerializableFunction & Serializable)input -> (ScanRequest)ScanRequest.builder().tableName(tableName).totalSegments(Integer.valueOf(3)).build()).items())).apply((PTransform)ParDo.of((DoFn)new DoFn<List<Map<String, AttributeValue>>, List<Map<String, AttributeValue>>>(){

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element List<Map<String, AttributeValue>> input, DoFn.OutputReceiver<List<Map<String, AttributeValue>>> out) {
                out.output(input);
            }
        }).withOutputTags(outputTag, TupleTagList.empty()));
        PCollection resultSetCount = (PCollection)writeOutput.get(outputTag).apply(Count.globally());
        PAssert.that((PCollection)resultSetCount).containsInAnyOrder((Iterable)ImmutableList.of((Object)3L));
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testReaderWithLimit() {
        List<Map<String, AttributeValue>> expected = DynamoDBIOTestHelper.generateTestData(tableName, 10);
        int limit = 5;
        PCollection actual = (PCollection)((PCollection)this.pipeline.apply((PTransform)DynamoDBIO.read().withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())).withScanRequestFn((SerializableFunction & Serializable)input -> (ScanRequest)ScanRequest.builder().tableName(tableName).totalSegments(Integer.valueOf(1)).limit(Integer.valueOf(5)).build()).items())).apply((PTransform)ParDo.of((DoFn)new IterateListDoFn()));
        PAssert.that((PCollection)actual).containsInAnyOrder(expected);
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testMissingScanRequestFn() {
        this.thrown.expectMessage("withScanRequestFn() is required");
        this.pipeline.apply((PTransform)DynamoDBIO.read().withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail((String)"withScanRequestFn() is required");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"withScanRequestFn() is required", (Object)ex.getMessage());
        }
    }

    @Test
    public void testMissingDynamoDbClientProvider() {
        this.thrown.expectMessage("withDynamoDbClientProvider() is required");
        this.pipeline.apply((PTransform)DynamoDBIO.read().withScanRequestFn((SerializableFunction & Serializable)input -> (ScanRequest)ScanRequest.builder().tableName(tableName).totalSegments(Integer.valueOf(3)).build()));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail((String)"withDynamoDbClientProvider() is required");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"withDynamoDbClientProvider() is required", (Object)ex.getMessage());
        }
    }

    @Test
    public void testMissingTotalSegments() {
        this.thrown.expectMessage("TotalSegments is required with withScanRequestFn()");
        this.pipeline.apply((PTransform)DynamoDBIO.read().withScanRequestFn((SerializableFunction & Serializable)input -> (ScanRequest)ScanRequest.builder().tableName(tableName).build()).withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail((String)"TotalSegments is required with withScanRequestFn()");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"TotalSegments is required with withScanRequestFn()", (Object)ex.getMessage());
        }
    }

    @Test
    public void testNegativeTotalSegments() {
        this.thrown.expectMessage("TotalSegments is required with withScanRequestFn() and greater zero");
        this.pipeline.apply((PTransform)DynamoDBIO.read().withScanRequestFn((SerializableFunction & Serializable)input -> (ScanRequest)ScanRequest.builder().tableName(tableName).totalSegments(Integer.valueOf(-1)).build()).withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail((String)"withTotalSegments() is expected and greater than zero");
        }
        catch (IllegalArgumentException ex) {
            Assert.assertEquals((Object)"TotalSegments is required with withScanRequestFn() and greater zero", (Object)ex.getMessage());
        }
    }

    @Test
    public void testWriteDataToDynamo() {
        ImmutableList items = ImmutableList.of((Object)KV.of((Object)"test1", (Object)111), (Object)KV.of((Object)"test2", (Object)222), (Object)KV.of((Object)"test3", (Object)333));
        PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Iterable)items))).apply((PTransform)DynamoDBIO.write().withWriteRequestMapperFn((SerializableFunction & Serializable)entry -> {
            ImmutableMap putRequest = ImmutableMap.of((Object)"hashKey1", (Object)((AttributeValue)AttributeValue.builder().s((String)entry.getKey()).build()), (Object)"rangeKey2", (Object)((AttributeValue)AttributeValue.builder().n(((Integer)entry.getValue()).toString()).build()));
            WriteRequest writeRequest = (WriteRequest)WriteRequest.builder().putRequest((PutRequest)PutRequest.builder().item((Map)putRequest).build()).build();
            return KV.of((Object)tableName, (Object)writeRequest);
        }).withRetryConfiguration(DynamoDBIO.RetryConfiguration.builder().setMaxAttempts(5).setMaxDuration(Duration.standardMinutes((long)1L)).setRetryPredicate(DynamoDBIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE).build()).withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        PCollection publishedResultsSize = (PCollection)output.apply(Count.globally());
        PAssert.that((PCollection)publishedResultsSize).containsInAnyOrder((Object[])new Long[]{0L});
        this.pipeline.run().waitUntilFinish();
        int actualItemCount = DynamoDBIOTestHelper.readDataFromTable(tableName).size();
        Assert.assertEquals((long)3L, (long)actualItemCount);
    }

    @Test
    public void testRetries() throws Throwable {
        this.thrown.expectMessage("Error writing to DynamoDB");
        ImmutableList items = ImmutableList.of((Object)KV.of((Object)"test1", (Object)111), (Object)KV.of((Object)"test2", (Object)222), (Object)KV.of((Object)"test3", (Object)333));
        DynamoDbClient amazonDynamoDBMock = (DynamoDbClient)Mockito.mock(DynamoDbClient.class);
        Mockito.when((Object)amazonDynamoDBMock.batchWriteItem((BatchWriteItemRequest)Mockito.any(BatchWriteItemRequest.class))).thenThrow(new Throwable[]{DynamoDbException.builder().message("Service unavailable").build()});
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Iterable)items))).apply((PTransform)DynamoDBIO.write().withWriteRequestMapperFn((SerializableFunction & Serializable)entry -> {
            ImmutableMap putRequest = ImmutableMap.of((Object)"hashKey1", (Object)((AttributeValue)AttributeValue.builder().s((String)entry.getKey()).build()), (Object)"rangeKey2", (Object)((AttributeValue)AttributeValue.builder().n(((Integer)entry.getValue()).toString()).build()));
            WriteRequest writeRequest = (WriteRequest)WriteRequest.builder().putRequest((PutRequest)PutRequest.builder().item((Map)putRequest).build()).build();
            return KV.of((Object)tableName, (Object)writeRequest);
        }).withRetryConfiguration(DynamoDBIO.RetryConfiguration.builder().setMaxAttempts(4).setMaxDuration(Duration.standardSeconds((long)10L)).setRetryPredicate(DynamoDBIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE).build()).withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(amazonDynamoDBMock)));
        try {
            this.pipeline.run().waitUntilFinish();
        }
        catch (Pipeline.PipelineExecutionException e) {
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 1));
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 2));
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 3));
            throw e.getCause();
        }
        Assert.fail((String)"Pipeline is expected to fail because we were unable to write to DynamoDb.");
    }

    @Test
    public void testWriteDeduplication() {
        List<Integer> duplications = Arrays.asList(1, 2, 3);
        List<String> deduplicateKeys = Arrays.asList("hashKey1", "rangeKey2");
        DynamoDbClient amazonDynamoDBMock = (DynamoDbClient)Mockito.mock(DynamoDbClient.class);
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(duplications))).apply("duplicate", (PTransform)ParDo.of((DoFn)new WriteDuplicateGeneratorDoFn()))).apply((PTransform)DynamoDBIO.write().withWriteRequestMapperFn((SerializableFunction & Serializable)entry -> {
            ImmutableMap putRequest = ImmutableMap.of((Object)"hashKey1", (Object)((AttributeValue)AttributeValue.builder().s((String)entry.getKey()).build()), (Object)"rangeKey2", (Object)((AttributeValue)AttributeValue.builder().n(((Integer)entry.getValue()).toString()).build()));
            WriteRequest writeRequest = (WriteRequest)WriteRequest.builder().putRequest((PutRequest)PutRequest.builder().item((Map)putRequest).build()).build();
            return KV.of((Object)tableName, (Object)writeRequest);
        }).withRetryConfiguration(DynamoDBIO.RetryConfiguration.builder().setMaxAttempts(5).setMaxDuration(Duration.standardMinutes((long)1L)).setRetryPredicate(DynamoDBIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE).build()).withDynamoDbClientProvider((DynamoDbClientProvider)DynamoDbClientProviderMock.of(amazonDynamoDBMock)).withDeduplicateKeys(deduplicateKeys));
        this.pipeline.run().waitUntilFinish();
        ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(BatchWriteItemRequest.class);
        ((DynamoDbClient)Mockito.verify((Object)amazonDynamoDBMock, (VerificationMode)Mockito.times((int)3))).batchWriteItem((BatchWriteItemRequest)argumentCaptor.capture());
        List batchRequests = argumentCaptor.getAllValues();
        batchRequests.forEach(batchRequest -> {
            List requests = (List)batchRequest.requestItems().get(tableName);
            Assert.assertEquals((long)10L, (long)requests.size());
            List requestKeys = requests.stream().map(request -> request.putRequest() != null ? request.putRequest().item() : request.deleteRequest().key()).collect(Collectors.toList());
            Assert.assertEquals((long)new HashSet(requestKeys).size(), (long)requestKeys.size());
        });
    }

    private static class IterateListDoFn
    extends DoFn<List<Map<String, AttributeValue>>, Map<String, AttributeValue>> {
        private IterateListDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element List<Map<String, AttributeValue>> items, DoFn.OutputReceiver<Map<String, AttributeValue>> out) {
            for (Map<String, AttributeValue> item : items) {
                out.output(item);
            }
        }
    }

    private static class WriteDuplicateGeneratorDoFn
    extends DoFn<Integer, KV<String, Integer>> {
        private WriteDuplicateGeneratorDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) {
            for (int i = 0; i < (Integer)ctx.element(); ++i) {
                for (int j = 1; j <= 10; ++j) {
                    KV item = KV.of((Object)("test" + j), (Object)(1000 + j));
                    ctx.output((Object)item);
                }
            }
        }
    }
}

