package org.apache.beam.sdk.io.aws2.dynamodb;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.common.RetryConfiguration;
import org.apache.beam.sdk.io.aws2.dynamodb.DynamoDBIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest.class */
public class DynamoDBIOWriteTest {
    private static final String tableName = "Test";

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();

    @Rule
    public final ExpectedException thrown = ExpectedException.none();

    @Mock
    public DynamoDbClient client;
    private static Function<Item, WriteRequest> putRequest = item -> {
        return (WriteRequest) WriteRequest.builder().putRequest((PutRequest) PutRequest.builder().item(item.attributeMap()).build()).build();
    };
    private static Function<Item, WriteRequest> deleteRequest = item -> {
        return (WriteRequest) WriteRequest.builder().deleteRequest((DeleteRequest) DeleteRequest.builder().key(item.attributeMap()).build()).build();
    };
    private static SerializableFunction<Item, KV<String, WriteRequest>> putRequestMapper = item -> {
        return KV.of(tableName, putRequest.apply(item));
    };
    private static SerializableFunction<Item, KV<String, WriteRequest>> deleteRequestMapper = item -> {
        return KV.of(tableName, deleteRequest.apply(item));
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$AddDuplicatesDoFn.class */
    private static class AddDuplicatesDoFn extends DoFn<Item, Item> {
        private final int duplicates;
        private final SerializableBiFunction<Item, Integer, Item> decorator;

        AddDuplicatesDoFn(int i, boolean z) {
            this.duplicates = i;
            this.decorator = z ? (item, num) -> {
                return item.withEntry("duplicate", num.toString());
            } : (item2, num2) -> {
                return item2;
            };
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Item, Item>.ProcessContext processContext) {
            Item item = (Item) processContext.element();
            IntStream.rangeClosed(1, this.duplicates).forEach(i -> {
                processContext.output((Item) this.decorator.apply(item, Integer.valueOf(i)));
            });
            processContext.output(item);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1510297982:
                    if (implMethodName.equals("lambda$new$7c9e1ebf$1")) {
                        z = true;
                        break;
                    }
                    break;
                case 1510297983:
                    if (implMethodName.equals("lambda$new$7c9e1ebf$2")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$AddDuplicatesDoFn") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$Item;Ljava/lang/Integer;)Lorg/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$Item;")) {
                        return (item2, num2) -> {
                            return item2;
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$AddDuplicatesDoFn") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$Item;Ljava/lang/Integer;)Lorg/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$Item;")) {
                        return (item, num) -> {
                            return item.withEntry("duplicate", num.toString());
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$GenerateItems.class */
    private static class GenerateItems extends DoFn<Integer, Item> {
        private GenerateItems() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Item>.ProcessContext processContext) {
            IntStream.range(0, ((Integer) processContext.element()).intValue()).forEach(i -> {
                processContext.output(Item.of(i));
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @DefaultCoder(AvroCoder.class)
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$Item.class */
    public static class Item implements Serializable {
        Map<String, String> entries;

        private Item() {
        }

        private Item(Map<String, String> map) {
            this.entries = map;
        }

        static Item of(int i) {
            return new Item(ImmutableMap.of("id", String.valueOf(i)));
        }

        static Item of(Map<String, AttributeValue> map) {
            return new Item(ImmutableMap.copyOf(Maps.transformValues(map, attributeValue -> {
                return attributeValue.s();
            })));
        }

        static List<Item> range(int i, int i2) {
            return (List) IntStream.range(i, i2).mapToObj(Item::of).collect(Collectors.toList());
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Item withEntry(String str, String str2) {
            return new Item(ImmutableMap.builder().putAll(this.entries).put(str, str2).build());
        }

        Map<String, AttributeValue> attributeMap() {
            return new HashMap(Maps.transformValues(this.entries, str -> {
                return (AttributeValue) AttributeValue.builder().s(str).build();
            }));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.entries, ((Item) obj).entries);
        }

        public int hashCode() {
            return Objects.hash(this.entries);
        }

        public String toString() {
            return "Item" + this.entries;
        }
    }

    @Before
    public void configureClientBuilderFactory() {
        MockClientBuilderFactory.set(this.pipeline, DynamoDbClientBuilder.class, this.client);
    }

    @Test
    public void testWritePutItems() {
        writePutItems(Function.identity());
    }

    @Test
    public void testWritePutItemsWithLegacyProvider() {
        MockClientBuilderFactory.set(this.pipeline, DynamoDbClientBuilder.class, null);
        writePutItems(write -> {
            return write.withDynamoDbClientProvider(StaticDynamoDBClientProvider.of(this.client));
        });
    }

    private void writePutItems(Function<DynamoDBIO.Write<Item>, DynamoDBIO.Write<Item>> function) {
        List<Item> range = Item.range(0, 100);
        Supplier<List<Item>> captureBatchWrites = captureBatchWrites(this.client, writeRequest -> {
            return writeRequest.putRequest().item();
        });
        PAssert.that(this.pipeline.apply(Create.of(range)).apply(function.apply(DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper)))).empty();
        this.pipeline.run().waitUntilFinish();
        Assertions.assertThat(captureBatchWrites.get()).containsExactlyInAnyOrderElementsOf(range);
    }

    @Test
    public void testWritePutItemsWithDuplicates() {
        List<Item> range = Item.range(0, 100);
        Supplier<List<List<Item>>> captureBatchWriteRequests = captureBatchWriteRequests(this.client, writeRequest -> {
            return writeRequest.putRequest().item();
        });
        this.pipeline.apply(Create.of(range)).apply(ParDo.of(new AddDuplicatesDoFn(3, false))).apply(DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper));
        this.pipeline.run().waitUntilFinish();
        List<List<Item>> list = captureBatchWriteRequests.get();
        Iterator<List<Item>> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(it.next()).doesNotHaveDuplicates();
        }
        Assertions.assertThat(list.stream().flatMap((v0) -> {
            return v0.stream();
        })).containsAll(range);
    }

    @Test
    public void testWritePutItemsWithDuplicatesByKey() {
        ImmutableList of = ImmutableList.of("id");
        List<Item> range = Item.range(0, 100);
        Supplier<List<List<Item>>> captureBatchWriteRequests = captureBatchWriteRequests(this.client, writeRequest -> {
            return writeRequest.putRequest().item();
        });
        this.pipeline.apply(Create.of(range)).apply(ParDo.of(new AddDuplicatesDoFn(3, true))).apply(DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper).withDeduplicateKeys(of));
        this.pipeline.run().waitUntilFinish();
        List<List<Item>> list = captureBatchWriteRequests.get();
        Iterator<List<Item>> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat((List) it.next().stream().map(item -> {
                Map<String, String> map = item.entries;
                Objects.requireNonNull(of);
                return new Item(Maps.filterKeys(map, (v1) -> {
                    return r3.contains(v1);
                }));
            }).collect(Collectors.toList())).doesNotHaveDuplicates();
        }
        Assertions.assertThat(list.stream().flatMap((v0) -> {
            return v0.stream();
        })).containsAll(range);
    }

    @Test
    public void testWriteDeleteItems() {
        writeDeleteItems(Function.identity());
    }

    @Test
    public void testWriteDeleteItemsWithLegacyProvider() {
        MockClientBuilderFactory.set(this.pipeline, DynamoDbClientBuilder.class, null);
        writeDeleteItems(write -> {
            return write.withDynamoDbClientProvider(StaticDynamoDBClientProvider.of(this.client));
        });
    }

    private void writeDeleteItems(Function<DynamoDBIO.Write<Item>, DynamoDBIO.Write<Item>> function) {
        List<Item> range = Item.range(0, 100);
        Supplier<List<Item>> captureBatchWrites = captureBatchWrites(this.client, writeRequest -> {
            return writeRequest.deleteRequest().key();
        });
        PAssert.that(this.pipeline.apply(Create.of(range)).apply(function.apply(DynamoDBIO.write().withWriteRequestMapperFn(deleteRequestMapper)))).empty();
        this.pipeline.run().waitUntilFinish();
        Assertions.assertThat(captureBatchWrites.get()).hasSize(100);
        Assertions.assertThat(captureBatchWrites.get()).containsExactlyInAnyOrderElementsOf(range);
    }

    @Test
    public void testWriteDeleteItemsWithDuplicates() {
        List<Item> range = Item.range(0, 100);
        Supplier<List<List<Item>>> captureBatchWriteRequests = captureBatchWriteRequests(this.client, writeRequest -> {
            return writeRequest.deleteRequest().key();
        });
        this.pipeline.apply(Create.of(range)).apply(ParDo.of(new AddDuplicatesDoFn(3, false))).apply(DynamoDBIO.write().withWriteRequestMapperFn(deleteRequestMapper));
        this.pipeline.run().waitUntilFinish();
        List<List<Item>> list = captureBatchWriteRequests.get();
        Iterator<List<Item>> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(it.next()).doesNotHaveDuplicates();
        }
        Assertions.assertThat(list.stream().flatMap((v0) -> {
            return v0.stream();
        })).containsAll(range);
    }

    @Test
    public void testWritePutItemsWithPartialSuccess() {
        List<WriteRequest> putRequests = putRequests(Item.range(0, 10));
        Mockito.when(this.client.batchWriteItem((BatchWriteItemRequest) ArgumentMatchers.any(BatchWriteItemRequest.class))).thenReturn(partialWriteSuccess(putRequests.subList(4, 10))).thenReturn(partialWriteSuccess(putRequests.subList(8, 10))).thenReturn((BatchWriteItemResponse) BatchWriteItemResponse.builder().build());
        this.pipeline.apply(Create.of(10, new Integer[0])).apply(ParDo.of(new GenerateItems())).apply("write", DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper));
        this.pipeline.run().waitUntilFinish();
        ((DynamoDbClient) Mockito.verify(this.client, Mockito.times(3))).batchWriteItem((BatchWriteItemRequest) ArgumentMatchers.any(BatchWriteItemRequest.class));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.client});
        ((DynamoDbClient) inOrder.verify(this.client)).batchWriteItem((BatchWriteItemRequest) ArgumentMatchers.argThat(matchWritesUnordered(putRequests)));
        ((DynamoDbClient) inOrder.verify(this.client)).batchWriteItem((BatchWriteItemRequest) ArgumentMatchers.argThat(matchWritesUnordered(putRequests.subList(4, 10))));
        ((DynamoDbClient) inOrder.verify(this.client)).batchWriteItem((BatchWriteItemRequest) ArgumentMatchers.argThat(matchWritesUnordered(putRequests.subList(8, 10))));
    }

    @Test
    public void testBuildWithCredentialsProviderAndRegion() {
        Region region = Region.US_EAST_1;
        DefaultCredentialsProvider create = DefaultCredentialsProvider.create();
        Assertions.assertThat(DynamoDBIO.write().withDynamoDbClientProvider(create, region.id()).getClientConfiguration()).isEqualTo(ClientConfiguration.create(create, region, (URI) null));
    }

    @Test
    public void testBuildWithCredentialsProviderAndRegionAndEndpoint() {
        Region region = Region.US_EAST_1;
        DefaultCredentialsProvider create = DefaultCredentialsProvider.create();
        URI create2 = URI.create("localhost:9999");
        Assertions.assertThat(DynamoDBIO.write().withDynamoDbClientProvider(create, region.id(), create2).getClientConfiguration()).isEqualTo(ClientConfiguration.create(create, region, create2));
    }

    @Test
    public void testBuildWithRetryConfig() {
        Assertions.assertThat(DynamoDBIO.write().withRetryConfiguration(DynamoDBIO.RetryConfiguration.builder().setMaxAttempts(3).setMaxDuration(Duration.ZERO).build()).getClientConfiguration()).isEqualTo(ClientConfiguration.builder().retry(RetryConfiguration.builder().numRetries(8).build()).build());
    }

    private Supplier<List<List<Item>>> captureBatchWriteRequests(DynamoDbClient dynamoDbClient, Function<WriteRequest, Map<String, AttributeValue>> function) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BatchWriteItemRequest.class);
        Mockito.when(dynamoDbClient.batchWriteItem((BatchWriteItemRequest) forClass.capture())).thenReturn((BatchWriteItemResponse) BatchWriteItemResponse.builder().build());
        return () -> {
            return (List) forClass.getAllValues().stream().flatMap(batchWriteItemRequest -> {
                return batchWriteItemRequest.requestItems().values().stream();
            }).map(list -> {
                return (List) list.stream().map(function).map(Item::of).collect(Collectors.toList());
            }).collect(Collectors.toList());
        };
    }

    private Supplier<List<Item>> captureBatchWrites(DynamoDbClient dynamoDbClient, Function<WriteRequest, Map<String, AttributeValue>> function) {
        Supplier<List<List<Item>>> captureBatchWriteRequests = captureBatchWriteRequests(dynamoDbClient, function);
        return () -> {
            return (List) ((List) captureBatchWriteRequests.get()).stream().flatMap(list -> {
                return list.stream();
            }).collect(Collectors.toList());
        };
    }

    private static ArgumentMatcher<BatchWriteItemRequest> matchWritesUnordered(List<WriteRequest> list) {
        return batchWriteItemRequest -> {
            return batchWriteItemRequest != null && ((List) batchWriteItemRequest.requestItems().get(tableName)).size() == list.size() && ((List) batchWriteItemRequest.requestItems().get(tableName)).containsAll(list);
        };
    }

    private static BatchWriteItemResponse partialWriteSuccess(List<WriteRequest> list) {
        return (BatchWriteItemResponse) BatchWriteItemResponse.builder().unprocessedItems(ImmutableMap.of(tableName, list)).build();
    }

    private static List<WriteRequest> putRequests(List<Item> list) {
        return (List) list.stream().map(putRequest).collect(Collectors.toList());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1612772423:
                if (implMethodName.equals("lambda$static$4bec8f18$1")) {
                    z = false;
                    break;
                }
                break;
            case 1660999484:
                if (implMethodName.equals("lambda$static$d5c83ffc$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$Item;)Lorg/apache/beam/sdk/values/KV;")) {
                    return item -> {
                        return KV.of(tableName, putRequest.apply(item));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOWriteTest$Item;)Lorg/apache/beam/sdk/values/KV;")) {
                    return item2 -> {
                        return KV.of(tableName, deleteRequest.apply(item2));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
