/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.dynamodb;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.dynamodb.DynamoDBIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;

@RunWith(value=MockitoJUnitRunner.class)
public class DynamoDBIOWriteTest {
    private static final String tableName = "Test";
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    @Rule
    public final ExpectedException thrown = ExpectedException.none();
    @Mock
    public DynamoDbClient client;
    private static Function<Item, WriteRequest> putRequest = item -> (WriteRequest)WriteRequest.builder().putRequest((PutRequest)PutRequest.builder().item(item.attributeMap()).build()).build();
    private static Function<Item, WriteRequest> deleteRequest = key -> (WriteRequest)WriteRequest.builder().deleteRequest((DeleteRequest)DeleteRequest.builder().key(key.attributeMap()).build()).build();
    private static SerializableFunction<Item, KV<String, WriteRequest>> putRequestMapper = (SerializableFunction & Serializable)item -> KV.of((Object)tableName, (Object)putRequest.apply((Item)item));
    private static SerializableFunction<Item, KV<String, WriteRequest>> deleteRequestMapper = (SerializableFunction & Serializable)key -> KV.of((Object)tableName, (Object)deleteRequest.apply((Item)key));

    @Before
    public void configureClientBuilderFactory() {
        MockClientBuilderFactory.set(this.pipeline, DynamoDbClientBuilder.class, this.client);
    }

    @Test
    public void testWritePutItems() {
        List<Item> items = Item.range(0, 100);
        Supplier<List<Item>> capturePuts = this.captureBatchWrites(this.client, req -> req.putRequest().item());
        DynamoDBIO.Write write = DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper);
        PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(items))).apply((PTransform)write);
        PAssert.that((PCollection)output).empty();
        this.pipeline.run().waitUntilFinish();
        Assertions.assertThat(capturePuts.get()).containsExactlyInAnyOrderElementsOf(items);
    }

    @Test
    public void testWritePutItemsWithDuplicates() {
        List<Item> items = Item.range(0, 100);
        Supplier<List<List<Item>>> captureRequests = this.captureBatchWriteRequests(this.client, req -> req.putRequest().item());
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(items))).apply((PTransform)ParDo.of((DoFn)new AddDuplicatesDoFn(3, false)))).apply((PTransform)DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper));
        this.pipeline.run().waitUntilFinish();
        List<List<Item>> requests = captureRequests.get();
        for (List<Item> reqItems : requests) {
            Assertions.assertThat(reqItems).doesNotHaveDuplicates();
        }
        Assertions.assertThat(requests.stream().flatMap(Collection::stream)).containsAll(items);
    }

    @Test
    public void testWritePutItemsWithDuplicatesByKey() {
        ImmutableList keys = ImmutableList.of((Object)"id");
        List<Item> items = Item.range(0, 100);
        Supplier<List<List<Item>>> captureRequests = this.captureBatchWriteRequests(this.client, req -> req.putRequest().item());
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(items))).apply((PTransform)ParDo.of((DoFn)new AddDuplicatesDoFn(3, true)))).apply((PTransform)DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper).withDeduplicateKeys((List)keys));
        this.pipeline.run().waitUntilFinish();
        List<List<Item>> requests = captureRequests.get();
        for (List<Item> reqItems : requests) {
            List keysOnly = reqItems.stream().map(item -> new Item(Maps.filterKeys(item.entries, arg_0 -> ((ImmutableList)keys).contains(arg_0)))).collect(Collectors.toList());
            Assertions.assertThat(keysOnly).doesNotHaveDuplicates();
        }
        Assertions.assertThat(requests.stream().flatMap(Collection::stream)).containsAll(items);
    }

    @Test
    public void testWriteDeleteItems() {
        List<Item> items = Item.range(0, 100);
        Supplier<List<Item>> captureDeletes = this.captureBatchWrites(this.client, req -> req.deleteRequest().key());
        DynamoDBIO.Write write = DynamoDBIO.write().withWriteRequestMapperFn(deleteRequestMapper);
        PCollection output = (PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(items))).apply((PTransform)write);
        PAssert.that((PCollection)output).empty();
        this.pipeline.run().waitUntilFinish();
        Assertions.assertThat(captureDeletes.get()).hasSize(100);
        Assertions.assertThat(captureDeletes.get()).containsExactlyInAnyOrderElementsOf(items);
    }

    @Test
    public void testWriteDeleteItemsWithDuplicates() {
        List<Item> items = Item.range(0, 100);
        Supplier<List<List<Item>>> captureRequests = this.captureBatchWriteRequests(this.client, req -> req.deleteRequest().key());
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of(items))).apply((PTransform)ParDo.of((DoFn)new AddDuplicatesDoFn(3, false)))).apply((PTransform)DynamoDBIO.write().withWriteRequestMapperFn(deleteRequestMapper));
        this.pipeline.run().waitUntilFinish();
        List<List<Item>> requests = captureRequests.get();
        for (List<Item> reqItems : requests) {
            Assertions.assertThat(reqItems).doesNotHaveDuplicates();
        }
        Assertions.assertThat(requests.stream().flatMap(Collection::stream)).containsAll(items);
    }

    @Test
    public void testWritePutItemsWithPartialSuccess() {
        List<WriteRequest> writes = DynamoDBIOWriteTest.putRequests(Item.range(0, 10));
        Mockito.when((Object)this.client.batchWriteItem((BatchWriteItemRequest)ArgumentMatchers.any(BatchWriteItemRequest.class))).thenReturn((Object)DynamoDBIOWriteTest.partialWriteSuccess(writes.subList(4, 10))).thenReturn((Object)DynamoDBIOWriteTest.partialWriteSuccess(writes.subList(8, 10))).thenReturn((Object)((BatchWriteItemResponse)BatchWriteItemResponse.builder().build()));
        ((PCollection)((PCollection)this.pipeline.apply((PTransform)Create.of((Object)10, (Object[])new Integer[0]))).apply((PTransform)ParDo.of((DoFn)new GenerateItems()))).apply("write", (PTransform)DynamoDBIO.write().withWriteRequestMapperFn(putRequestMapper));
        PipelineResult result = this.pipeline.run();
        result.waitUntilFinish();
        ((DynamoDbClient)Mockito.verify((Object)this.client, (VerificationMode)Mockito.times((int)3))).batchWriteItem((BatchWriteItemRequest)ArgumentMatchers.any(BatchWriteItemRequest.class));
        InOrder ordered = Mockito.inOrder((Object[])new Object[]{this.client});
        ((DynamoDbClient)ordered.verify((Object)this.client)).batchWriteItem((BatchWriteItemRequest)ArgumentMatchers.argThat(DynamoDBIOWriteTest.matchWritesUnordered(writes)));
        ((DynamoDbClient)ordered.verify((Object)this.client)).batchWriteItem((BatchWriteItemRequest)ArgumentMatchers.argThat(DynamoDBIOWriteTest.matchWritesUnordered(writes.subList(4, 10))));
        ((DynamoDbClient)ordered.verify((Object)this.client)).batchWriteItem((BatchWriteItemRequest)ArgumentMatchers.argThat(DynamoDBIOWriteTest.matchWritesUnordered(writes.subList(8, 10))));
    }

    private Supplier<List<List<Item>>> captureBatchWriteRequests(DynamoDbClient mock, Function<WriteRequest, Map<String, AttributeValue>> extractor) {
        ArgumentCaptor reqCaptor = ArgumentCaptor.forClass(BatchWriteItemRequest.class);
        Mockito.when((Object)mock.batchWriteItem((BatchWriteItemRequest)reqCaptor.capture())).thenReturn((Object)((BatchWriteItemResponse)BatchWriteItemResponse.builder().build()));
        return () -> reqCaptor.getAllValues().stream().flatMap(req -> req.requestItems().values().stream()).map(writes -> writes.stream().map(extractor).map(Item::of).collect(Collectors.toList())).collect(Collectors.toList());
    }

    private Supplier<List<Item>> captureBatchWrites(DynamoDbClient mock, Function<WriteRequest, Map<String, AttributeValue>> extractor) {
        Supplier<List<List<Item>>> requests = this.captureBatchWriteRequests(mock, extractor);
        return () -> ((List)requests.get()).stream().flatMap(reqs -> reqs.stream()).collect(Collectors.toList());
    }

    private static ArgumentMatcher<BatchWriteItemRequest> matchWritesUnordered(List<WriteRequest> writes) {
        return req -> req != null && ((List)req.requestItems().get(tableName)).size() == writes.size() && ((List)req.requestItems().get(tableName)).containsAll(writes);
    }

    private static BatchWriteItemResponse partialWriteSuccess(List<WriteRequest> unprocessed) {
        return (BatchWriteItemResponse)BatchWriteItemResponse.builder().unprocessedItems((Map)ImmutableMap.of((Object)tableName, unprocessed)).build();
    }

    private static List<WriteRequest> putRequests(List<Item> items) {
        return items.stream().map(putRequest).collect(Collectors.toList());
    }

    private static class AddDuplicatesDoFn
    extends DoFn<Item, Item> {
        private final int duplicates;
        private final SerializableBiFunction<Item, Integer, Item> decorator;

        AddDuplicatesDoFn(int duplicates, boolean decorate) {
            this.duplicates = duplicates;
            this.decorator = decorate ? (SerializableBiFunction & Serializable)(item, i) -> item.withEntry("duplicate", i.toString()) : (SerializableBiFunction & Serializable)(item, i) -> item;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) {
            Item original = (Item)ctx.element();
            IntStream.rangeClosed(1, this.duplicates).forEach(i -> ctx.output((Object)((Item)this.decorator.apply((Object)original, (Object)i))));
            ctx.output((Object)original);
        }
    }

    private static class GenerateItems
    extends DoFn<Integer, Item> {
        private GenerateItems() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx) {
            IntStream.range(0, (Integer)ctx.element()).forEach(i -> ctx.output((Object)Item.of(i)));
        }
    }

    @DefaultCoder(value=AvroCoder.class)
    static class Item
    implements Serializable {
        Map<String, String> entries;

        private Item() {
        }

        private Item(Map<String, String> entries) {
            this.entries = entries;
        }

        static Item of(int id) {
            return new Item((Map<String, String>)ImmutableMap.of((Object)"id", (Object)String.valueOf(id)));
        }

        static Item of(Map<String, AttributeValue> attributes) {
            return new Item((Map<String, String>)ImmutableMap.copyOf((Map)Maps.transformValues(attributes, a -> a.s())));
        }

        static List<Item> range(int startInclusive, int endExclusive) {
            return IntStream.range(startInclusive, endExclusive).mapToObj(Item::of).collect(Collectors.toList());
        }

        Item withEntry(String key, String value) {
            return new Item((Map<String, String>)ImmutableMap.builder().putAll(this.entries).put((Object)key, (Object)value).build());
        }

        Map<String, AttributeValue> attributeMap() {
            return new HashMap<String, AttributeValue>(Maps.transformValues(this.entries, v -> (AttributeValue)AttributeValue.builder().s(v).build()));
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            return Objects.equals(this.entries, ((Item)o).entries);
        }

        public int hashCode() {
            return Objects.hash(this.entries);
        }

        public String toString() {
            return "Item" + this.entries;
        }
    }
}

