package org.apache.beam.sdk.io.aws.dynamodb;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.class */
public class DynamoDBIOTest implements Serializable {
    private static final String tableName = "TaskA";
    private static final int numOfItems = 10;
    private static List<Map<String, AttributeValue>> expected;

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Rule
    public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(DynamoDBIO.class);

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest$IterateListDoFn.class */
    private static class IterateListDoFn extends DoFn<List<Map<String, AttributeValue>>, Map<String, AttributeValue>> {
        private IterateListDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element List<Map<String, AttributeValue>> list, DoFn.OutputReceiver<Map<String, AttributeValue>> outputReceiver) {
            Iterator<Map<String, AttributeValue>> it = list.iterator();
            while (it.hasNext()) {
                outputReceiver.output(it.next());
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest$WriteDuplicateGeneratorDoFn.class */
    private static class WriteDuplicateGeneratorDoFn extends DoFn<Integer, WriteRequest> {
        private WriteDuplicateGeneratorDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, WriteRequest>.ProcessContext processContext) {
            for (int i = 0; i < ((Integer) processContext.element()).intValue(); i++) {
                List<WriteRequest> generateWriteRequests = DynamoDBIOTestHelper.generateWriteRequests(DynamoDBIOTest.numOfItems);
                Objects.requireNonNull(processContext);
                generateWriteRequests.forEach((v1) -> {
                    r1.output(v1);
                });
            }
        }
    }

    @BeforeClass
    public static void setup() {
        DynamoDBIOTestHelper.startServerClient();
        DynamoDBIOTestHelper.createTestTable(tableName);
        expected = DynamoDBIOTestHelper.generateTestData(tableName, numOfItems);
    }

    @AfterClass
    public static void destroy() {
        DynamoDBIOTestHelper.stopServerClient(tableName);
    }

    @Test
    public void testReadScanResult() {
        PAssert.that(this.pipeline.apply(DynamoDBIO.read().withAwsClientsProvider(AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())).withScanRequestFn(r4 -> {
            return new ScanRequest(tableName).withTotalSegments(1);
        }).items())).containsInAnyOrder(new List[]{expected});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testReadScanResultWithLimit() {
        PAssert.that(this.pipeline.apply(DynamoDBIO.read().withAwsClientsProvider(AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())).withScanRequestFn(r4 -> {
            return new ScanRequest(tableName).withTotalSegments(1).withLimit(5);
        }).items()).apply(ParDo.of(new IterateListDoFn()))).containsInAnyOrder(expected);
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testMissingScanRequestFn() {
        this.thrown.expectMessage("withScanRequestFn() is required");
        this.pipeline.apply(DynamoDBIO.read().withAwsClientsProvider(AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail("withScanRequestFn() is required");
        } catch (IllegalArgumentException e) {
            Assert.assertEquals("withScanRequestFn() is required", e.getMessage());
        }
    }

    @Test
    public void testMissingAwsClientsProvider() {
        this.thrown.expectMessage("withAwsClientsProvider() is required");
        this.pipeline.apply(DynamoDBIO.read().withScanRequestFn(r4 -> {
            return new ScanRequest(tableName).withTotalSegments(3);
        }));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail("withAwsClientsProvider() is required");
        } catch (IllegalArgumentException e) {
            Assert.assertEquals("withAwsClientsProvider() is required", e.getMessage());
        }
    }

    @Test
    public void testMissingTotalSegments() {
        this.thrown.expectMessage("TotalSegments is required with withScanRequestFn()");
        this.pipeline.apply(DynamoDBIO.read().withScanRequestFn(r4 -> {
            return new ScanRequest(tableName);
        }).withAwsClientsProvider(AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail("TotalSegments is required with withScanRequestFn()");
        } catch (IllegalArgumentException e) {
            Assert.assertEquals("TotalSegments is required with withScanRequestFn()", e.getMessage());
        }
    }

    @Test
    public void testNegativeTotalSegments() {
        this.thrown.expectMessage("TotalSegments is required with withScanRequestFn() and greater zero");
        this.pipeline.apply(DynamoDBIO.read().withScanRequestFn(r4 -> {
            return new ScanRequest(tableName).withTotalSegments(-1);
        }).withAwsClientsProvider(AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient())));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail("withTotalSegments() is expected and greater than zero");
        } catch (IllegalArgumentException e) {
            Assert.assertEquals("TotalSegments is required with withScanRequestFn() and greater zero", e.getMessage());
        }
    }

    @Test
    public void testWriteDataToDynamo() {
        PAssert.that(this.pipeline.apply(Create.of(DynamoDBIOTestHelper.generateWriteRequests(numOfItems))).apply(DynamoDBIO.write().withWriteRequestMapperFn(writeRequest -> {
            return KV.of(tableName, writeRequest);
        }).withRetryConfiguration(DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1L))).withAwsClientsProvider(AwsClientsProviderMock.of(DynamoDBIOTestHelper.getDynamoDBClient()))).apply(Count.globally())).containsInAnyOrder(new Long[]{0L});
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testRetries() throws Throwable {
        this.thrown.expectMessage("Error writing to DynamoDB");
        List<WriteRequest> generateWriteRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems);
        AmazonDynamoDB amazonDynamoDB = (AmazonDynamoDB) Mockito.mock(AmazonDynamoDB.class);
        Mockito.when(amazonDynamoDB.batchWriteItem((BatchWriteItemRequest) Mockito.any(BatchWriteItemRequest.class))).thenThrow(new Throwable[]{new AmazonDynamoDBException("Service unavailable")});
        this.pipeline.apply(Create.of(generateWriteRequests)).apply(DynamoDBIO.write().withWriteRequestMapperFn(writeRequest -> {
            return KV.of(tableName, writeRequest);
        }).withRetryConfiguration(DynamoDBIO.RetryConfiguration.create(4, Duration.standardSeconds(10L))).withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDB)));
        try {
            this.pipeline.run().waitUntilFinish();
            Assert.fail("Pipeline is expected to fail because we were unable to write to DynamoDB.");
        } catch (Pipeline.PipelineExecutionException e) {
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 1));
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 2));
            this.expectedLogs.verifyWarn(String.format("Error writing to DynamoDB. Retry attempt[%d]", 3));
            throw e.getCause();
        }
    }

    @Test
    public void testWriteDeduplication() {
        List asList = Arrays.asList(1, 2, 3);
        List asList2 = Arrays.asList("hashKey1", "rangeKey2");
        AmazonDynamoDB amazonDynamoDB = (AmazonDynamoDB) Mockito.mock(AmazonDynamoDB.class);
        this.pipeline.apply(Create.of(asList)).apply("duplicate", ParDo.of(new WriteDuplicateGeneratorDoFn())).apply(DynamoDBIO.write().withWriteRequestMapperFn(writeRequest -> {
            return KV.of(tableName, writeRequest);
        }).withRetryConfiguration(DynamoDBIO.RetryConfiguration.create(5, Duration.standardMinutes(1L))).withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDB)).withDeduplicateKeys(asList2));
        this.pipeline.run().waitUntilFinish();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BatchWriteItemRequest.class);
        ((AmazonDynamoDB) Mockito.verify(amazonDynamoDB, Mockito.times(3))).batchWriteItem((BatchWriteItemRequest) forClass.capture());
        forClass.getAllValues().forEach(batchWriteItemRequest -> {
            List list = (List) batchWriteItemRequest.getRequestItems().get(tableName);
            Assert.assertEquals(10L, list.size());
            List list2 = (List) list.stream().map(writeRequest2 -> {
                return writeRequest2.getPutRequest() != null ? writeRequest2.getPutRequest().getItem() : writeRequest2.getDeleteRequest().getKey();
            }).collect(Collectors.toList());
            Assert.assertEquals(new HashSet(list2).size(), list2.size());
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1967858514:
                if (implMethodName.equals("lambda$testReadScanResultWithLimit$2c4f310b$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1434963847:
                if (implMethodName.equals("lambda$testRetries$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
            case -975503290:
                if (implMethodName.equals("lambda$testWriteDataToDynamo$2f9baff8$1")) {
                    z = 6;
                    break;
                }
                break;
            case -213960944:
                if (implMethodName.equals("lambda$testNegativeTotalSegments$43268ee4$1")) {
                    z = 7;
                    break;
                }
                break;
            case 33944467:
                if (implMethodName.equals("lambda$testWriteDeduplication$43268ee4$1")) {
                    z = true;
                    break;
                }
                break;
            case 43443997:
                if (implMethodName.equals("lambda$testMissingAwsClientsProvider$43268ee4$1")) {
                    z = 4;
                    break;
                }
                break;
            case 86550659:
                if (implMethodName.equals("lambda$testMissingTotalSegments$43268ee4$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1161968037:
                if (implMethodName.equals("lambda$testReadScanResult$2c4f310b$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/amazonaws/services/dynamodbv2/model/WriteRequest;)Lorg/apache/beam/sdk/values/KV;")) {
                    return writeRequest -> {
                        return KV.of(tableName, writeRequest);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/amazonaws/services/dynamodbv2/model/WriteRequest;)Lorg/apache/beam/sdk/values/KV;")) {
                    return writeRequest2 -> {
                        return KV.of(tableName, writeRequest2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Lcom/amazonaws/services/dynamodbv2/model/ScanRequest;")) {
                    return r4 -> {
                        return new ScanRequest(tableName).withTotalSegments(1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Lcom/amazonaws/services/dynamodbv2/model/ScanRequest;")) {
                    return r42 -> {
                        return new ScanRequest(tableName).withTotalSegments(1).withLimit(5);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Lcom/amazonaws/services/dynamodbv2/model/ScanRequest;")) {
                    return r43 -> {
                        return new ScanRequest(tableName).withTotalSegments(3);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Lcom/amazonaws/services/dynamodbv2/model/ScanRequest;")) {
                    return r44 -> {
                        return new ScanRequest(tableName);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/amazonaws/services/dynamodbv2/model/WriteRequest;)Lorg/apache/beam/sdk/values/KV;")) {
                    return writeRequest3 -> {
                        return KV.of(tableName, writeRequest3);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Lcom/amazonaws/services/dynamodbv2/model/ScanRequest;")) {
                    return r45 -> {
                        return new ScanRequest(tableName).withTotalSegments(-1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
