package org.apache.beam.sdk.io.aws2.kinesis;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
import org.apache.beam.sdk.io.aws2.common.ObjectPool;
import org.apache.beam.sdk.io.aws2.kinesis.AutoValue_KinesisIO_Read;
import org.apache.beam.sdk.io.aws2.kinesis.AutoValue_KinesisIO_RecordAggregation;
import org.apache.beam.sdk.io.aws2.kinesis.AutoValue_KinesisIO_Write;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.MovingFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSortedSet;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.common.InitialPositionInStream;

/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.class */
public final class KinesisIO {

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Read.class */
    public static abstract class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
        private static final long serialVersionUID = 1;

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Read$Builder.class */
        static abstract class Builder {
            abstract Builder setStreamName(String str);

            abstract Builder setConsumerArn(String str);

            abstract Builder setInitialPosition(StartingPoint startingPoint);

            abstract Builder setClientConfiguration(ClientConfiguration clientConfiguration);

            abstract Builder setMaxNumRecords(long j);

            abstract Builder setMaxReadTime(Duration duration);

            abstract Builder setUpToDateThreshold(Duration duration);

            abstract Builder setRequestRecordsLimit(Integer num);

            abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);

            abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory);

            abstract Builder setMaxCapacityPerShard(Integer num);

            abstract Read build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getStreamName();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract String getConsumerArn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract StartingPoint getInitialPosition();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ClientConfiguration getClientConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getMaxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getMaxReadTime();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getUpToDateThreshold();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getRequestRecordsLimit();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract WatermarkPolicyFactory getWatermarkPolicyFactory();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RateLimitPolicyFactory getRateLimitPolicyFactory();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getMaxCapacityPerShard();

        abstract Builder toBuilder();

        public Read withStreamName(String str) {
            return toBuilder().setStreamName(str).build();
        }

        public Read withConsumerArn(String str) {
            return toBuilder().setConsumerArn(str).build();
        }

        public Read withInitialPositionInStream(InitialPositionInStream initialPositionInStream) {
            return toBuilder().setInitialPosition(new StartingPoint(initialPositionInStream)).build();
        }

        public Read withInitialTimestampInStream(Instant instant) {
            return toBuilder().setInitialPosition(new StartingPoint(instant)).build();
        }

        public Read withClientConfiguration(ClientConfiguration clientConfiguration) {
            Preconditions.checkArgument(clientConfiguration != null, "ClientConfiguration cannot be null");
            return toBuilder().setClientConfiguration(clientConfiguration).build();
        }

        public Read withMaxNumRecords(long j) {
            Preconditions.checkArgument(j > 0, "maxNumRecords must be positive, but was: %s", j);
            return toBuilder().setMaxNumRecords(j).build();
        }

        public Read withMaxReadTime(Duration duration) {
            Preconditions.checkArgument(duration != null, "maxReadTime can not be null");
            return toBuilder().setMaxReadTime(duration).build();
        }

        public Read withUpToDateThreshold(Duration duration) {
            Preconditions.checkArgument(duration != null, "upToDateThreshold can not be null");
            return toBuilder().setUpToDateThreshold(duration).build();
        }

        public Read withRequestRecordsLimit(int i) {
            Preconditions.checkArgument(i > 0, "limit must be positive, but was: %s", i);
            Preconditions.checkArgument(i <= 10000, "limit must be up to 10,000, but was: %s", i);
            return toBuilder().setRequestRecordsLimit(Integer.valueOf(i)).build();
        }

        public Read withArrivalTimeWatermarkPolicy() {
            return toBuilder().setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy()).build();
        }

        public Read withArrivalTimeWatermarkPolicy(Duration duration) {
            return toBuilder().setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy(duration)).build();
        }

        public Read withProcessingTimeWatermarkPolicy() {
            return toBuilder().setWatermarkPolicyFactory(WatermarkPolicyFactory.withProcessingTimePolicy()).build();
        }

        public Read withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFactory) {
            Preconditions.checkArgument(watermarkPolicyFactory != null, "watermarkPolicyFactory cannot be null");
            return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build();
        }

        public Read withFixedDelayRateLimitPolicy() {
            return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay()).build();
        }

        public Read withFixedDelayRateLimitPolicy(Duration duration) {
            Preconditions.checkArgument(duration != null, "delay cannot be null");
            return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay(duration)).build();
        }

        public Read withDynamicDelayRateLimitPolicy(Supplier<Duration> supplier) {
            Preconditions.checkArgument(supplier != null, "delay cannot be null");
            return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withDelay(supplier)).build();
        }

        public Read withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) {
            Preconditions.checkArgument(rateLimitPolicyFactory != null, "rateLimitPolicyFactory cannot be null");
            return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build();
        }

        public Read withMaxCapacityPerShard(Integer num) {
            Preconditions.checkArgument(num.intValue() > 0, "maxCapacity must be positive, but was: %s", num);
            return toBuilder().setMaxCapacityPerShard(num).build();
        }

        public PCollection<KinesisRecord> expand(PBegin pBegin) {
            Preconditions.checkArgument(getWatermarkPolicyFactory() != null, "WatermarkPolicyFactory is required");
            Preconditions.checkArgument(getRateLimitPolicyFactory() != null, "RateLimitPolicyFactory is required");
            ClientBuilderFactory.validate((AwsOptions) pBegin.getPipeline().getOptions().as(AwsOptions.class), getClientConfiguration());
            PTransform from = org.apache.beam.sdk.io.Read.from(new KinesisSource(this));
            PTransform pTransform = from;
            if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
                pTransform = from.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
            }
            return pBegin.apply(pTransform);
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$RecordAggregation.class */
    public static abstract class RecordAggregation implements Serializable {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$RecordAggregation$Builder.class */
        public static abstract class Builder {
            public abstract Builder maxBytes(int i);

            public abstract Builder maxBufferedTime(Duration duration);

            public abstract Builder shardRefreshInterval(Duration duration);

            abstract Builder maxBufferedTimeJitter(double d);

            abstract Builder shardRefreshIntervalJitter(double d);

            abstract RecordAggregation autoBuild();

            public RecordAggregation build() {
                RecordAggregation autoBuild = autoBuild();
                Preconditions.checkArgument(autoBuild.maxBufferedTimeJitter() >= 0.0d && autoBuild.maxBufferedTimeJitter() <= 1.0d);
                Preconditions.checkArgument(autoBuild.maxBytes() > 0 && autoBuild.maxBytes() <= 1048576, "maxBytes must be positive and <= %s", 1048576);
                return autoBuild;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int maxBytes();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration maxBufferedTime();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract double maxBufferedTimeJitter();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration shardRefreshInterval();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract double shardRefreshIntervalJitter();

        Instant nextBufferTimeout() {
            return nextInstant(maxBufferedTime(), maxBufferedTimeJitter());
        }

        Instant nextShardRefresh() {
            return nextInstant(shardRefreshInterval(), shardRefreshIntervalJitter());
        }

        private Instant nextInstant(Duration duration, double d) {
            return Instant.ofEpochMilli(DateTimeUtils.currentTimeMillis() + ((long) (((1.0d - d) + (d * Math.random())) * duration.getMillis())));
        }

        public static Builder builder() {
            return new AutoValue_KinesisIO_RecordAggregation.Builder().maxBytes(1048576).maxBufferedTimeJitter(0.7d).maxBufferedTime(Duration.millis(500L)).shardRefreshIntervalJitter(0.5d).shardRefreshInterval(Duration.standardMinutes(2L));
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write.class */
    public static abstract class Write<T> extends PTransform<PCollection<T>, Result> {
        static final int MAX_RECORDS_PER_REQUEST = 500;
        static final int MAX_BYTES_PER_RECORD = 1048576;
        static final int MAX_BYTES_PER_REQUEST = 5242880;
        private static final int DEFAULT_CONCURRENCY = 3;
        private static final KinesisPartitioner<? extends Object> DUMMY_PARTITIONER = obj -> {
            return "";
        };
        private static final SerializableFunction<? extends Object, byte[]> DUMMY_SERIALIZER = obj -> {
            return ArrayUtils.EMPTY_BYTE_ARRAY;
        };

        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write$AggregatedWriter.class */
        static class AggregatedWriter<T> extends Writer<T> {
            private static final Logger LOG = LoggerFactory.getLogger(AggregatedWriter.class);
            private static final ObjectPool<String, ShardRanges> SHARD_RANGES_BY_STREAM = new ObjectPool<>(ShardRanges::of);
            private final RecordAggregation aggSpec;
            private final Map<BigInteger, RecordsAggregator> aggregators;
            private final PartitionKeyHasher pkHasher;
            private final ShardRanges shardRanges;

            AggregatedWriter(PipelineOptions pipelineOptions, Write<T> write, RecordAggregation recordAggregation) {
                super(pipelineOptions, write);
                this.aggSpec = recordAggregation;
                this.aggregators = new LinkedHashMap();
                this.pkHasher = new PartitionKeyHasher();
                if (!recordAggregation.shardRefreshInterval().isLongerThan(Duration.ZERO) || (write.partitioner() instanceof KinesisPartitioner.ExplicitPartitioner)) {
                    this.shardRanges = ShardRanges.EMPTY;
                    return;
                }
                this.shardRanges = SHARD_RANGES_BY_STREAM.retain(write.streamName());
                ShardRanges shardRanges = this.shardRanges;
                KinesisAsyncClient kinesisAsyncClient = this.kinesis;
                Objects.requireNonNull(recordAggregation);
                shardRanges.refreshPeriodically(kinesisAsyncClient, recordAggregation::nextShardRefresh);
            }

            @Override // org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write.Writer
            public void startBundle() {
                super.startBundle();
                this.aggregators.clear();
            }

            @Override // org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write.Writer
            protected void write(String str, String str2, byte[] bArr) throws Throwable {
                BigInteger shardAwareHashKey;
                ShardRanges shardRanges = this.shardRanges;
                KinesisAsyncClient kinesisAsyncClient = this.kinesis;
                RecordAggregation recordAggregation = this.aggSpec;
                Objects.requireNonNull(recordAggregation);
                shardRanges.refreshPeriodically(kinesisAsyncClient, recordAggregation::nextShardRefresh);
                if (str2 != null) {
                    shardAwareHashKey = new BigInteger(str2);
                } else {
                    BigInteger hashKey = this.pkHasher.hashKey(str);
                    shardAwareHashKey = this.shardRanges.shardAwareHashKey(hashKey);
                    if (shardAwareHashKey != null) {
                        str2 = shardAwareHashKey.toString();
                    } else {
                        shardAwareHashKey = hashKey;
                    }
                }
                RecordsAggregator computeIfAbsent = this.aggregators.computeIfAbsent(shardAwareHashKey, bigInteger -> {
                    return newRecordsAggregator();
                });
                if (!computeIfAbsent.addRecord(str, str2, bArr)) {
                    addRequestEntry(computeIfAbsent.getAndReset(this.aggSpec.nextBufferTimeout()));
                    this.aggregators.remove(shardAwareHashKey);
                    if (computeIfAbsent.addRecord(str, str2, bArr)) {
                        this.aggregators.put(shardAwareHashKey, computeIfAbsent);
                    } else {
                        super.write(str, str2, bArr);
                    }
                } else if (!computeIfAbsent.hasCapacity()) {
                    addRequestEntry(computeIfAbsent.get());
                    this.aggregators.remove(shardAwareHashKey);
                }
                if (this.handler.requestsInProgress() < this.spec.concurrentRequests() || Math.random() < 0.05d) {
                    checkAggregationTimeouts();
                }
            }

            private RecordsAggregator newRecordsAggregator() {
                return new RecordsAggregator(Math.min(this.aggSpec.maxBytes(), this.spec.batchMaxBytes()), this.aggSpec.nextBufferTimeout());
            }

            private void checkAggregationTimeouts() throws Throwable {
                Instant now = Instant.now();
                ArrayList arrayList = new ArrayList();
                for (Map.Entry<BigInteger, RecordsAggregator> entry : this.aggregators.entrySet()) {
                    RecordsAggregator value = entry.getValue();
                    if (value.timeout().isAfter(now)) {
                        break;
                    }
                    LOG.debug("Adding aggregated entry after timeout [delay = {} ms]", Long.valueOf(now.getMillis() - value.timeout().getMillis()));
                    addRequestEntry(value.get());
                    arrayList.add(entry.getKey());
                }
                if (arrayList.isEmpty()) {
                    return;
                }
                this.aggregators.keySet().removeAll(arrayList);
                asyncFlushEntries();
            }

            @Override // org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write.Writer
            public void finishBundle() throws Throwable {
                Iterator<RecordsAggregator> it = this.aggregators.values().iterator();
                while (it.hasNext()) {
                    addRequestEntry(it.next().get());
                }
                super.finishBundle();
            }

            @Override // org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write.Writer, java.lang.AutoCloseable
            public void close() throws Exception {
                super.close();
                SHARD_RANGES_BY_STREAM.release(this.shardRanges);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> streamName(String str);

            abstract Builder<T> batchMaxRecords(int i);

            abstract Builder<T> batchMaxBytes(int i);

            abstract Builder<T> concurrentRequests(int i);

            abstract Builder<T> partitioner(KinesisPartitioner<T> kinesisPartitioner);

            abstract Builder<T> serializer(SerializableFunction<T, byte[]> serializableFunction);

            abstract Builder<T> clientConfiguration(ClientConfiguration clientConfiguration);

            abstract Builder<T> recordAggregation(RecordAggregation recordAggregation);

            abstract Write<T> build();
        }

        @VisibleForTesting
        @NotThreadSafe
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write$PartitionKeyHasher.class */
        static class PartitionKeyHasher {
            private final MessageDigest md5Digest = md5Digest();

            PartitionKeyHasher() {
            }

            BigInteger hashKey(String str) {
                byte[] digest = this.md5Digest.digest(str.getBytes(StandardCharsets.UTF_8));
                this.md5Digest.reset();
                return new BigInteger(1, digest);
            }

            private static MessageDigest md5Digest() {
                try {
                    return MessageDigest.getInstance("MD5");
                } catch (NoSuchAlgorithmException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write$Result.class */
        public static class Result implements POutput {
            private final Pipeline pipeline;

            private Result(Pipeline pipeline) {
                this.pipeline = pipeline;
            }

            public Pipeline getPipeline() {
                return this.pipeline;
            }

            public Map<TupleTag<?>, PValue> expand() {
                return ImmutableMap.of();
            }

            public void finishSpecifyingOutput(String str, PInput pInput, PTransform<?, ?> pTransform) {
            }
        }

        @ThreadSafe
        @VisibleForTesting
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write$ShardRanges.class */
        interface ShardRanges {
            public static final ShardRanges EMPTY = new ShardRanges() { // from class: org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write.ShardRanges.1
            };

            /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write$ShardRanges$ShardRangesImpl.class */
            public static class ShardRangesImpl implements ShardRanges {
                private static final Logger LOG = LoggerFactory.getLogger(ShardRanges.class);
                private final String streamName;
                private final AtomicBoolean running;
                private NavigableSet<BigInteger> shardBounds;
                private Instant nextRefresh;

                private ShardRangesImpl(String str) {
                    this.running = new AtomicBoolean(false);
                    this.shardBounds = ImmutableSortedSet.of();
                    this.nextRefresh = Instant.EPOCH;
                    this.streamName = str;
                }

                @Override // org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write.ShardRanges
                public BigInteger shardAwareHashKey(BigInteger bigInteger) {
                    BigInteger floor = this.shardBounds.floor(bigInteger);
                    if (!this.shardBounds.isEmpty() && floor == null) {
                        LOG.warn("No shard found for {} [shards={}]", bigInteger, Integer.valueOf(this.shardBounds.size()));
                    }
                    return floor;
                }

                @Override // org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write.ShardRanges
                public void refreshPeriodically(KinesisAsyncClient kinesisAsyncClient, Supplier<Instant> supplier) {
                    if (this.nextRefresh.isBeforeNow() && this.running.compareAndSet(false, true)) {
                        refresh(kinesisAsyncClient, supplier, new TreeSet<>(), null);
                    }
                }

                private void refresh(KinesisAsyncClient kinesisAsyncClient, Supplier<Instant> supplier, TreeSet<BigInteger> treeSet, String str) {
                    ListShardsRequest.Builder shardFilter = ListShardsRequest.builder().shardFilter(builder -> {
                        builder.type(ShardFilterType.AT_LATEST);
                    });
                    if (str != null) {
                        shardFilter.nextToken(str);
                    } else {
                        shardFilter.streamName(this.streamName);
                    }
                    kinesisAsyncClient.listShards((ListShardsRequest) shardFilter.build()).whenComplete((BiConsumer) (listShardsResponse, th) -> {
                        if (th != null) {
                            LOG.warn("Failed to refresh shards.", th);
                            this.nextRefresh = (Instant) supplier.get();
                            this.running.set(false);
                            return;
                        }
                        listShardsResponse.shards().forEach(shard -> {
                            treeSet.add(lowerHashKey(shard));
                        });
                        if (listShardsResponse.nextToken() != null) {
                            refresh(kinesisAsyncClient, supplier, treeSet, listShardsResponse.nextToken());
                            return;
                        }
                        LOG.debug("Done refreshing {} shards.", Integer.valueOf(treeSet.size()));
                        this.nextRefresh = (Instant) supplier.get();
                        this.running.set(false);
                        this.shardBounds = treeSet;
                    });
                }

                private BigInteger lowerHashKey(Shard shard) {
                    return new BigInteger(shard.hashKeyRange().startingHashKey());
                }
            }

            static ShardRanges of(String str) {
                return new ShardRangesImpl(str);
            }

            default BigInteger shardAwareHashKey(BigInteger bigInteger) {
                return null;
            }

            default void refreshPeriodically(KinesisAsyncClient kinesisAsyncClient, Supplier<Instant> supplier) {
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write$Stats.class */
        public static class Stats implements AsyncBatchWriteHandler.Stats {
            private static final String METRICS_PREFIX = "kinesis_io/write_";
            private final MovingFunction numUserRecords;
            private final MovingFunction numClientRecords;
            private final MovingFunction minClientRecordBytes;
            private final MovingFunction maxClientRecordBytes;
            private final MovingFunction sumClientRecordBytes;
            private final MovingFunction numPutPartialRetries;
            private final MovingFunction numPutRequests;
            private final MovingFunction minPutRequestLatency;
            private final MovingFunction maxPutRequestLatency;
            private final MovingFunction sumPutRequestLatency;
            private long nextLogTime;
            private static final Logger LOG = LoggerFactory.getLogger(Stats.class);
            private static final Duration LOG_STATS_PERIOD = Duration.standardSeconds(10);
            private static final Combine.BinaryCombineLongFn MIN = Min.ofLongs();
            private static final Combine.BinaryCombineLongFn MAX = Max.ofLongs();
            private static final Combine.BinaryCombineLongFn SUM = Sum.ofLongs();
            private static final Duration MOVING_WINDOW = Duration.standardMinutes(3);
            private static final Duration UPDATE_PERIOD = Duration.standardSeconds(30);
            private static final Counter USER_RECORDS_COUNT = Metrics.counter(Write.class, "kinesis_io/write_user_records_count");
            private static final Counter CLIENT_RECORDS_COUNT = Metrics.counter(Write.class, "kinesis_io/write_client_records_count");
            private static final Distribution WRITE_LATENCY_MS = Metrics.distribution(Write.class, "kinesis_io/write_latency_ms");

            private Stats() {
                this.numUserRecords = newFun(SUM);
                this.numClientRecords = newFun(SUM);
                this.minClientRecordBytes = newFun(MIN);
                this.maxClientRecordBytes = newFun(MAX);
                this.sumClientRecordBytes = newFun(SUM);
                this.numPutPartialRetries = newFun(SUM);
                this.numPutRequests = newFun(SUM);
                this.minPutRequestLatency = newFun(MIN);
                this.maxPutRequestLatency = newFun(MAX);
                this.sumPutRequestLatency = newFun(SUM);
                this.nextLogTime = DateTimeUtils.currentTimeMillis() + LOG_STATS_PERIOD.getMillis();
            }

            private static MovingFunction newFun(Combine.BinaryCombineLongFn binaryCombineLongFn) {
                return new MovingFunction(MOVING_WINDOW.getMillis(), UPDATE_PERIOD.getMillis(), 1, 1, binaryCombineLongFn);
            }

            void addUserRecord() {
                USER_RECORDS_COUNT.inc();
                this.numUserRecords.add(DateTimeUtils.currentTimeMillis(), 1L);
            }

            void addClientRecord(int i) {
                long currentTimeMillis = DateTimeUtils.currentTimeMillis();
                CLIENT_RECORDS_COUNT.inc();
                this.numClientRecords.add(currentTimeMillis, 1L);
                this.minClientRecordBytes.add(currentTimeMillis, i);
                this.maxClientRecordBytes.add(currentTimeMillis, i);
                this.sumClientRecordBytes.add(currentTimeMillis, i);
            }

            @Override // org.apache.beam.sdk.io.aws2.common.AsyncBatchWriteHandler.Stats
            public void addBatchWriteRequest(long j, boolean z) {
                long currentTimeMillis = DateTimeUtils.currentTimeMillis();
                this.numPutRequests.add(currentTimeMillis, 1L);
                if (z) {
                    this.numPutPartialRetries.add(currentTimeMillis, 1L);
                }
                this.minPutRequestLatency.add(currentTimeMillis, j);
                this.maxPutRequestLatency.add(currentTimeMillis, j);
                this.sumPutRequestLatency.add(currentTimeMillis, j);
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void logPeriodically() {
                long currentTimeMillis = DateTimeUtils.currentTimeMillis();
                WRITE_LATENCY_MS.update(this.sumPutRequestLatency.get(currentTimeMillis), this.numPutRequests.get(currentTimeMillis), this.minPutRequestLatency.get(currentTimeMillis), this.maxPutRequestLatency.get(currentTimeMillis));
                if (currentTimeMillis <= this.nextLogTime || !LOG.isInfoEnabled()) {
                    return;
                }
                this.nextLogTime = currentTimeMillis + LOG_STATS_PERIOD.getMillis();
                long j = this.numClientRecords.get(currentTimeMillis);
                long j2 = this.numPutRequests.get(currentTimeMillis);
                long j3 = this.numPutPartialRetries.get(currentTimeMillis);
                Logger logger = LOG;
                Object[] objArr = new Object[11];
                objArr[0] = Long.valueOf(j2 - j3);
                objArr[1] = Long.valueOf(j2);
                objArr[2] = Double.valueOf(j2 > 0 ? (1.0d * j3) / j2 : 0.0d);
                objArr[3] = Long.valueOf(this.numUserRecords.get(currentTimeMillis));
                objArr[4] = Long.valueOf(j);
                objArr[5] = Long.valueOf(j > 0 ? this.sumClientRecordBytes.get(currentTimeMillis) / j : 0L);
                objArr[6] = Long.valueOf(this.minClientRecordBytes.get(currentTimeMillis));
                objArr[7] = Long.valueOf(this.maxClientRecordBytes.get(currentTimeMillis));
                objArr[8] = Long.valueOf(j2 > 0 ? this.sumPutRequestLatency.get(currentTimeMillis) / j2 : 0L);
                objArr[9] = Long.valueOf(this.minPutRequestLatency.get(currentTimeMillis));
                objArr[10] = Long.valueOf(this.maxPutRequestLatency.get(currentTimeMillis));
                logger.info("Kinesis put records stats [ batches={}, requests={}, partialRetryRatio={}\n  userRecords={}, clientRecords={}, avgClientRecordSize={} bytes, minClientRecordSize={} bytes, maxClientRecordSize={} bytes\n  avgRequestLatency={} ms, minRequestLatency={} ms, maxRequestLatency={}]", objArr);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write$Writer.class */
        public static class Writer<T> implements AutoCloseable {
            private static final int PARTITION_KEY_MAX_LENGTH = 256;
            private static final int PARTITION_KEY_MIN_LENGTH = 1;
            private static final int PARTIAL_RETRIES = 10;
            private static final ObjectPool.ClientPool<KinesisAsyncClient> CLIENTS = ObjectPool.pooledClientFactory(KinesisAsyncClient.builder());
            protected final Write<T> spec;
            protected final Stats stats;
            protected final AsyncBatchWriteHandler<PutRecordsRequestEntry, PutRecordsResultEntry> handler;
            protected final KinesisAsyncClient kinesis;
            private List<PutRecordsRequestEntry> requestEntries;
            private int requestBytes = 0;

            Writer(PipelineOptions pipelineOptions, Write<T> write) {
                ClientConfiguration clientConfiguration = write.clientConfiguration();
                this.spec = write;
                this.stats = new Stats();
                this.kinesis = CLIENTS.retain((AwsOptions) pipelineOptions.as(AwsOptions.class), clientConfiguration);
                this.requestEntries = new ArrayList();
                this.handler = AsyncBatchWriteHandler.byPosition(write.concurrentRequests(), 10, clientConfiguration.retry(), this.stats, (str, list) -> {
                    return putRecords(this.kinesis, str, list);
                }, putRecordsResultEntry -> {
                    return putRecordsResultEntry.errorCode();
                });
            }

            private static CompletableFuture<List<PutRecordsResultEntry>> putRecords(KinesisAsyncClient kinesisAsyncClient, String str, List<PutRecordsRequestEntry> list) {
                return kinesisAsyncClient.putRecords((PutRecordsRequest) PutRecordsRequest.builder().streamName(str).records(list).build()).thenApply(putRecordsResponse -> {
                    return putRecordsResponse.records();
                });
            }

            public void startBundle() {
                this.handler.reset();
                this.requestEntries.clear();
                this.requestBytes = 0;
            }

            public final void write(T t) throws Throwable {
                this.handler.checkForAsyncFailure();
                this.stats.addUserRecord();
                byte[] bArr = (byte[]) this.spec.serializer().apply(t);
                String partitionKey = this.spec.partitioner().getPartitionKey(t);
                String explicitHashKey = this.spec.partitioner().getExplicitHashKey(t);
                validatePartitionKey(partitionKey);
                if (explicitHashKey != null) {
                    validateExplicitHashKey(explicitHashKey);
                }
                write(partitionKey, explicitHashKey, bArr);
                this.stats.logPeriodically();
            }

            protected void write(String str, String str2, byte[] bArr) throws Throwable {
                PutRecordsRequestEntry.Builder partitionKey = PutRecordsRequestEntry.builder().data(SdkBytes.fromByteArrayUnsafe(bArr)).partitionKey(str);
                if (str2 != null) {
                    partitionKey.explicitHashKey(str2);
                }
                addRequestEntry((PutRecordsRequestEntry) partitionKey.build());
                if (hasCapacityForEntry(0)) {
                    return;
                }
                asyncFlushEntries();
            }

            private int entrySizeBytes(PutRecordsRequestEntry putRecordsRequestEntry) {
                int length = putRecordsRequestEntry.partitionKey().getBytes(StandardCharsets.UTF_8).length + putRecordsRequestEntry.data().asByteArrayUnsafe().length;
                if (putRecordsRequestEntry.explicitHashKey() != null) {
                    length += putRecordsRequestEntry.explicitHashKey().getBytes(StandardCharsets.UTF_8).length;
                }
                return length;
            }

            protected boolean hasCapacityForEntry(int i) {
                return this.requestEntries.size() < this.spec.batchMaxRecords() && this.requestBytes + i <= this.spec.batchMaxBytes();
            }

            protected int getRequestEntriesCount() {
                return this.requestEntries.size();
            }

            protected final void addRequestEntry(PutRecordsRequestEntry putRecordsRequestEntry) throws Throwable {
                int entrySizeBytes = entrySizeBytes(putRecordsRequestEntry);
                if (!hasCapacityForEntry(entrySizeBytes)) {
                    asyncFlushEntries();
                }
                this.stats.addClientRecord(entrySizeBytes);
                this.requestEntries.add(putRecordsRequestEntry);
                this.requestBytes += entrySizeBytes;
            }

            protected final void asyncFlushEntries() throws Throwable {
                if (this.handler.hasErrored() || this.requestEntries.isEmpty()) {
                    return;
                }
                List<PutRecordsRequestEntry> list = this.requestEntries;
                this.requestEntries = new ArrayList();
                this.requestBytes = 0;
                this.handler.batchWrite(this.spec.streamName(), list);
            }

            public void finishBundle() throws Throwable {
                asyncFlushEntries();
                this.handler.waitForCompletion();
                this.stats.logPeriodically();
            }

            @Override // java.lang.AutoCloseable
            public void close() throws Exception {
                CLIENTS.release(this.kinesis);
            }

            private void validatePartitionKey(String str) {
                int length = str != null ? str.length() : 0;
                Preconditions.checkState(PARTITION_KEY_MIN_LENGTH <= length && length <= PARTITION_KEY_MAX_LENGTH, "Invalid partition key of length {}", length);
            }

            private void validateExplicitHashKey(String str) {
                BigInteger bigInteger = new BigInteger(str);
                Preconditions.checkState(bigInteger.compareTo(KinesisPartitioner.MIN_HASH_KEY) >= 0 && bigInteger.compareTo(KinesisPartitioner.MAX_HASH_KEY) <= 0, "Explicit hash key must be 128-bit number.");
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract String streamName();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract int batchMaxRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract int batchMaxBytes();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract int concurrentRequests();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract KinesisPartitioner<T> partitioner();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract SerializableFunction<T, byte[]> serializer();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract ClientConfiguration clientConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Pure
        public abstract RecordAggregation recordAggregation();

        abstract Builder<T> builder();

        public Write<T> withStreamName(String str) {
            Preconditions.checkArgument(!StringUtils.isEmpty(str), "streamName cannot be empty");
            return builder().streamName(str).build();
        }

        public Write<T> withBatchMaxRecords(int i) {
            Preconditions.checkArgument(i > 0 && i <= MAX_RECORDS_PER_REQUEST, "batchMaxRecords must be in [1,%s]", MAX_RECORDS_PER_REQUEST);
            return builder().batchMaxRecords(i).build();
        }

        public Write<T> withBatchMaxBytes(int i) {
            Preconditions.checkArgument(i > 0 && i <= 5242880, "batchMaxBytes must be in [1,%s]", 5242880);
            return builder().batchMaxBytes(i).build();
        }

        public Write<T> withConcurrentRequests(int i) {
            Preconditions.checkArgument(i > 0, "concurrentRequests must be > 0");
            return builder().concurrentRequests(i).build();
        }

        public Write<T> withRecordAggregation(RecordAggregation recordAggregation) {
            return builder().recordAggregation(recordAggregation).build();
        }

        public Write<T> withRecordAggregation(Consumer<RecordAggregation.Builder> consumer) {
            RecordAggregation.Builder builder = RecordAggregation.builder();
            consumer.accept(builder);
            return withRecordAggregation(builder.build());
        }

        public Write<T> withRecordAggregationDisabled() {
            return builder().recordAggregation(null).build();
        }

        public Write<T> withPartitioner(KinesisPartitioner<T> kinesisPartitioner) {
            Preconditions.checkArgument(partitioner() != null, "partitioner cannot be null");
            return builder().partitioner(kinesisPartitioner).build();
        }

        public Write<T> withSerializer(SerializableFunction<T, byte[]> serializableFunction) {
            Preconditions.checkArgument(serializer() != null, "serializer cannot be null");
            return builder().serializer(serializableFunction).build();
        }

        public Write<T> withClientConfiguration(ClientConfiguration clientConfiguration) {
            Preconditions.checkArgument(clientConfiguration != null, "clientConfiguration cannot be null");
            return builder().clientConfiguration(clientConfiguration).build();
        }

        public Result expand(PCollection<T> pCollection) {
            Preconditions.checkArgument(!StringUtils.isEmpty(streamName()), "streamName is required");
            Preconditions.checkArgument(partitioner() != DUMMY_PARTITIONER, "partitioner is required");
            Preconditions.checkArgument(serializer() != DUMMY_SERIALIZER, "serializer is required");
            ClientBuilderFactory.validate((AwsOptions) pCollection.getPipeline().getOptions().as(AwsOptions.class), clientConfiguration());
            pCollection.apply(ParDo.of(new DoFn<T, Void>() { // from class: org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write.1
                private transient Writer<T> writer;

                @DoFn.Setup
                public void setup(PipelineOptions pipelineOptions) {
                    this.writer = Write.this.recordAggregation() != null ? new AggregatedWriter<>(pipelineOptions, Write.this, Write.this.recordAggregation()) : new Writer<>(pipelineOptions, Write.this);
                }

                @DoFn.StartBundle
                public void startBundle() {
                    writer().startBundle();
                }

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element T t) throws Throwable {
                    writer().write(t);
                }

                @DoFn.FinishBundle
                public void finishBundle() throws Throwable {
                    writer().finishBundle();
                }

                @DoFn.Teardown
                public void teardown() throws Exception {
                    if (this.writer != null) {
                        this.writer.close();
                        this.writer = null;
                    }
                }

                private Writer<T> writer() {
                    if (this.writer == null) {
                        throw new IllegalStateException("RecordWriter is null");
                    }
                    return this.writer;
                }
            }));
            return new Result(pCollection.getPipeline());
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 679585101:
                    if (implMethodName.equals("lambda$static$e8811095$1")) {
                        z = true;
                        break;
                    }
                    break;
                case 1394061572:
                    if (implMethodName.equals("lambda$static$ea174550$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)[B")) {
                        return obj -> {
                            return ArrayUtils.EMPTY_BYTE_ARRAY;
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisPartitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartitionKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/aws2/kinesis/KinesisIO$Write") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;")) {
                        return obj2 -> {
                            return "";
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public static Read read() {
        return new AutoValue_KinesisIO_Read.Builder().setClientConfiguration(ClientConfiguration.builder().build()).setMaxNumRecords(Long.MAX_VALUE).setUpToDateThreshold(Duration.ZERO).setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy()).setRateLimitPolicyFactory(RateLimitPolicyFactory.withDefaultRateLimiter()).build();
    }

    public static <T> Write<T> write() {
        return new AutoValue_KinesisIO_Write.Builder().streamName("").serializer(Write.DUMMY_SERIALIZER).partitioner(Write.DUMMY_PARTITIONER).clientConfiguration(ClientConfiguration.builder().build()).batchMaxRecords(500).batchMaxBytes(4718592).concurrentRequests(3).recordAggregation(RecordAggregation.builder().build()).build();
    }
}
