package org.apache.beam.sdk.io.aws2.dynamodb;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.aws2.dynamodb.AutoValue_DynamoDBIO_Read;
import org.apache.beam.sdk.io.aws2.dynamodb.AutoValue_DynamoDBIO_RetryConfiguration;
import org.apache.beam.sdk.io.aws2.dynamodb.AutoValue_DynamoDBIO_Write;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.class */
public final class DynamoDBIO {

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$Read.class */
    public static abstract class Read<T> extends PTransform<PBegin, PCollection<T>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$Read$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider);

            abstract Builder<T> setScanRequestFn(SerializableFunction<Void, ScanRequest> serializableFunction);

            abstract Builder<T> setSegmentId(Integer num);

            abstract Builder<T> setScanResponseMapperFn(SerializableFunction<ScanResponse, T> serializableFunction);

            abstract Builder<T> setCoder(Coder<T> coder);

            abstract Read<T> build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$Read$ItemsMapper.class */
        static final class ItemsMapper<T> implements SerializableFunction<ScanResponse, List<Map<String, AttributeValue>>> {
            ItemsMapper() {
            }

            public List<Map<String, AttributeValue>> apply(ScanResponse scanResponse) {
                return scanResponse == null ? Collections.emptyList() : scanResponse.items();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$Read$ReadFn.class */
        public static class ReadFn<T> extends DoFn<Read<T>, T> {
            private ReadFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element Read<T> read, DoFn.OutputReceiver<T> outputReceiver) {
                outputReceiver.output(read.getScanResponseMapperFn().apply(read.getDynamoDbClientProvider().getDynamoDbClient().scan((ScanRequest) ((ScanRequest) read.getScanRequestFn().apply((Object) null)).toBuilder().segment(read.getSegmentId()).build())));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$Read$SplitFn.class */
        public static class SplitFn<T> extends DoFn<Read<T>, Read<T>> {
            private SplitFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element Read<T> read, DoFn.OutputReceiver<Read<T>> outputReceiver) {
                ScanRequest scanRequest = (ScanRequest) read.getScanRequestFn().apply((Object) null);
                for (int i = 0; i < scanRequest.totalSegments().intValue(); i++) {
                    outputReceiver.output(read.withSegmentId(Integer.valueOf(i)));
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract DynamoDbClientProvider getDynamoDbClientProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<Void, ScanRequest> getScanRequestFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getSegmentId();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<ScanResponse, T> getScanResponseMapperFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<T> getCoder();

        abstract Builder<T> toBuilder();

        public Read<T> withDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider) {
            return toBuilder().setDynamoDbClientProvider(dynamoDbClientProvider).build();
        }

        public Read<T> withDynamoDbClientProvider(AwsCredentialsProvider awsCredentialsProvider, String str, URI uri) {
            return withDynamoDbClientProvider(new BasicDynamoDbClientProvider(awsCredentialsProvider, str, uri));
        }

        public Read<T> withDynamoDbClientProvider(AwsCredentialsProvider awsCredentialsProvider, String str) {
            return withDynamoDbClientProvider(awsCredentialsProvider, str, null);
        }

        public Read<T> withScanRequestFn(SerializableFunction<Void, ScanRequest> serializableFunction) {
            return toBuilder().setScanRequestFn(serializableFunction).build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Read<T> withSegmentId(Integer num) {
            Preconditions.checkArgument(num != null, "segmentId can not be null");
            return toBuilder().setSegmentId(num).build();
        }

        public Read<T> withScanResponseMapperFn(SerializableFunction<ScanResponse, T> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "scanResultMapper can not be null");
            return toBuilder().setScanResponseMapperFn(serializableFunction).build();
        }

        public Read<List<Map<String, AttributeValue>>> items() {
            return withScanResponseMapperFn(new ItemsMapper()).withCoder(ListCoder.of(MapCoder.of(StringUtf8Coder.of(), AttributeValueCoder.of())));
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkArgument(coder != null, "coder can not be null");
            return toBuilder().setCoder(coder).build();
        }

        public PCollection<T> expand(PBegin pBegin) {
            Preconditions.checkArgument(getScanRequestFn() != null, "withScanRequestFn() is required");
            Preconditions.checkArgument(getDynamoDbClientProvider() != null, "withDynamoDbClientProvider() is required");
            ScanRequest scanRequest = (ScanRequest) getScanRequestFn().apply((Object) null);
            Preconditions.checkArgument(scanRequest.totalSegments() != null && scanRequest.totalSegments().intValue() > 0, "TotalSegments is required with withScanRequestFn() and greater zero");
            PCollection apply = pBegin.apply("Create", Create.of(this, new Read[0])).apply("Split", ParDo.of(new SplitFn()));
            apply.setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() { // from class: org.apache.beam.sdk.io.aws2.dynamodb.DynamoDBIO.Read.1
            }));
            PCollection<T> apply2 = apply.apply("Reshuffle", Reshuffle.viaRandomKey()).apply("Read", ParDo.of(new ReadFn()));
            apply2.setCoder(getCoder());
            return apply2;
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$RetryConfiguration.class */
    public static abstract class RetryConfiguration implements Serializable {

        @VisibleForTesting
        static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$RetryConfiguration$Builder.class */
        public static abstract class Builder {
            public abstract Builder setMaxAttempts(int i);

            public abstract Builder setMaxDuration(Duration duration);

            abstract Builder setRetryPredicate(RetryPredicate retryPredicate);

            abstract RetryConfiguration autoBuild();

            public RetryConfiguration build() {
                RetryConfiguration autoBuild = autoBuild();
                Preconditions.checkArgument(autoBuild.getMaxAttempts() > 0, "maxAttempts should be greater than 0");
                Preconditions.checkArgument(autoBuild.getMaxDuration() != null && autoBuild.getMaxDuration().isLongerThan(Duration.ZERO), "maxDuration should be greater than 0");
                return autoBuild;
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$RetryConfiguration$DefaultRetryPredicate.class */
        private static class DefaultRetryPredicate implements RetryPredicate {
            private static final ImmutableSet<Integer> ELIGIBLE_CODES = ImmutableSet.of(503);

            private DefaultRetryPredicate() {
            }

            @Override // java.util.function.Predicate
            public boolean test(Throwable th) {
                return (th instanceof IOException) || (th instanceof DynamoDbException) || ((th instanceof DynamoDbException) && ELIGIBLE_CODES.contains(Integer.valueOf(((DynamoDbException) th).statusCode())));
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @FunctionalInterface
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$RetryConfiguration$RetryPredicate.class */
        public interface RetryPredicate extends Predicate<Throwable>, Serializable {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getMaxAttempts();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getMaxDuration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RetryPredicate getRetryPredicate();

        abstract Builder toBuilder();

        public static Builder builder() {
            return new AutoValue_DynamoDBIO_RetryConfiguration.Builder().setRetryPredicate(DEFAULT_RETRY_PREDICATE);
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$Write.class */
    public static abstract class Write<T> extends PTransform<PCollection<T>, PCollection<Void>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$Write$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider);

            abstract Builder<T> setRetryConfiguration(RetryConfiguration retryConfiguration);

            abstract Builder<T> setWriteItemMapperFn(SerializableFunction<T, KV<String, WriteRequest>> serializableFunction);

            abstract Write<T> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO$Write$WriteFn.class */
        public static class WriteFn<T> extends DoFn<T, Void> {

            @VisibleForTesting
            static final String RETRY_ATTEMPT_LOG = "Error writing to DynamoDB. Retry attempt[%d]";
            private transient FluentBackoff retryBackoff;
            private static final int BATCH_SIZE = 25;
            private transient DynamoDbClient client;
            private final Write spec;
            private List<KV<String, WriteRequest>> batch;
            private static final Duration RETRY_INITIAL_BACKOFF = Duration.standardSeconds(5);
            private static final Logger LOG = LoggerFactory.getLogger(WriteFn.class);
            private static final Counter DYNAMO_DB_WRITE_FAILURES = Metrics.counter(WriteFn.class, "DynamoDB_Write_Failures");

            WriteFn(Write write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() {
                this.client = this.spec.getDynamoDbClientProvider().getDynamoDbClient();
                this.retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(0).withInitialBackoff(RETRY_INITIAL_BACKOFF);
                if (this.spec.getRetryConfiguration() != null) {
                    this.retryBackoff = this.retryBackoff.withMaxRetries(this.spec.getRetryConfiguration().getMaxAttempts() - 1).withMaxCumulativeBackoff(this.spec.getRetryConfiguration().getMaxDuration());
                }
            }

            @DoFn.StartBundle
            public void startBundle(DoFn<T, Void>.StartBundleContext startBundleContext) {
                this.batch = new ArrayList();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<T, Void>.ProcessContext processContext) throws Exception {
                this.batch.add((KV) this.spec.getWriteItemMapperFn().apply(processContext.element()));
                if (this.batch.size() >= BATCH_SIZE) {
                    flushBatch();
                }
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn<T, Void>.FinishBundleContext finishBundleContext) throws Exception {
                flushBatch();
            }

            private void flushBatch() throws IOException, InterruptedException {
                if (this.batch.isEmpty()) {
                    return;
                }
                try {
                    BatchWriteItemRequest batchWriteItemRequest = (BatchWriteItemRequest) BatchWriteItemRequest.builder().requestItems((Map) this.batch.stream().collect(Collectors.groupingBy((v0) -> {
                        return v0.getKey();
                    }, Collectors.mapping((v0) -> {
                        return v0.getValue();
                    }, Collectors.toList())))).build();
                    Sleeper sleeper = Sleeper.DEFAULT;
                    BackOff backoff = this.retryBackoff.backoff();
                    int i = 0;
                    while (true) {
                        i++;
                        try {
                            this.client.batchWriteItem(batchWriteItemRequest);
                            return;
                        } catch (Exception e) {
                            if (this.spec.getRetryConfiguration() == null || !this.spec.getRetryConfiguration().getRetryPredicate().test(e)) {
                                DYNAMO_DB_WRITE_FAILURES.inc();
                                LOG.info("Unable to write batch items {}.", batchWriteItemRequest.requestItems().entrySet(), e);
                                throw new IOException("Error writing to DynamoDB (no attempt made to retry)", e);
                            }
                            if (!BackOffUtils.next(sleeper, backoff)) {
                                throw new IOException(String.format("Error writing to DynamoDB after %d attempt(s). No more attempts allowed", Integer.valueOf(i)), e);
                            }
                            LOG.warn(String.format(RETRY_ATTEMPT_LOG, Integer.valueOf(i)), e);
                        }
                    }
                    DYNAMO_DB_WRITE_FAILURES.inc();
                    LOG.info("Unable to write batch items {}.", batchWriteItemRequest.requestItems().entrySet(), e);
                    throw new IOException("Error writing to DynamoDB (no attempt made to retry)", e);
                } finally {
                    this.batch.clear();
                }
            }

            @DoFn.Teardown
            public void tearDown() {
                if (this.client != null) {
                    this.client.close();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract DynamoDbClientProvider getDynamoDbClientProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RetryConfiguration getRetryConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<T, KV<String, WriteRequest>> getWriteItemMapperFn();

        abstract Builder<T> toBuilder();

        public Write<T> withDynamoDbClientProvider(DynamoDbClientProvider dynamoDbClientProvider) {
            return toBuilder().setDynamoDbClientProvider(dynamoDbClientProvider).build();
        }

        public Write<T> withDynamoDbClientProvider(AwsCredentialsProvider awsCredentialsProvider, String str, URI uri) {
            return withDynamoDbClientProvider(new BasicDynamoDbClientProvider(awsCredentialsProvider, str, uri));
        }

        public Write<T> withDynamoDbClientProvider(AwsCredentialsProvider awsCredentialsProvider, String str) {
            return withDynamoDbClientProvider(awsCredentialsProvider, str, null);
        }

        public Write<T> withRetryConfiguration(RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument(retryConfiguration != null, "retryConfiguration is required");
            return toBuilder().setRetryConfiguration(retryConfiguration).build();
        }

        public Write<T> withWriteRequestMapperFn(SerializableFunction<T, KV<String, WriteRequest>> serializableFunction) {
            return toBuilder().setWriteItemMapperFn(serializableFunction).build();
        }

        public PCollection<Void> expand(PCollection<T> pCollection) {
            return pCollection.apply(ParDo.of(new WriteFn(this)));
        }
    }

    public static <T> Read<T> read() {
        return new AutoValue_DynamoDBIO_Read.Builder().build();
    }

    public static <T> Write<T> write() {
        return new AutoValue_DynamoDBIO_Write.Builder().build();
    }
}
