package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.model.Datapoint;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.DescribeStreamSummaryRequest;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescriptionSummary;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Minutes;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.class */
public class SimplifiedKinesisClient {
    private static final String KINESIS_NAMESPACE = "AWS/Kinesis";
    private static final String INCOMING_RECORDS_METRIC = "IncomingBytes";
    private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
    private static final String SUM_STATISTIC = "Sum";
    private static final String STREAM_NAME_DIMENSION = "StreamName";
    private static final int LIST_SHARDS_MAX_RESULTS = 1000;
    private static final int DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS = 10;
    private final AmazonKinesis kinesis;
    private final AmazonCloudWatch cloudWatch;
    private final Integer limit;
    private final Supplier<Instant> currentInstantSupplier;
    private static final Duration SPACING_FOR_TIMESTAMP_LIST_SHARDS_REQUEST_TO_NOT_EXCEED_TRIM_HORIZON = Duration.standardMinutes(5);
    private static final Duration DESCRIBE_STREAM_SUMMARY_INITIAL_BACKOFF = Duration.standardSeconds(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$amazonaws$services$kinesis$clientlibrary$lib$worker$InitialPositionInStream = new int[InitialPositionInStream.values().length];

        static {
            try {
                $SwitchMap$com$amazonaws$services$kinesis$clientlibrary$lib$worker$InitialPositionInStream[InitialPositionInStream.LATEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$amazonaws$services$kinesis$clientlibrary$lib$worker$InitialPositionInStream[InitialPositionInStream.TRIM_HORIZON.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$amazonaws$services$kinesis$clientlibrary$lib$worker$InitialPositionInStream[InitialPositionInStream.AT_TIMESTAMP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public SimplifiedKinesisClient(AmazonKinesis amazonKinesis, AmazonCloudWatch amazonCloudWatch, Integer num) {
        this(amazonKinesis, amazonCloudWatch, num, Instant::now);
    }

    SimplifiedKinesisClient(AmazonKinesis amazonKinesis, AmazonCloudWatch amazonCloudWatch, Integer num, Supplier<Instant> supplier) {
        this.kinesis = (AmazonKinesis) Preconditions.checkNotNull(amazonKinesis, "kinesis");
        this.cloudWatch = (AmazonCloudWatch) Preconditions.checkNotNull(amazonCloudWatch, "cloudWatch");
        this.limit = num;
        this.currentInstantSupplier = supplier;
    }

    public static SimplifiedKinesisClient from(AWSClientsProvider aWSClientsProvider, Integer num) {
        return new SimplifiedKinesisClient(aWSClientsProvider.getKinesisClient(), aWSClientsProvider.getCloudWatchClient(), num);
    }

    public String getShardIterator(String str, String str2, ShardIteratorType shardIteratorType, String str3, Instant instant) throws TransientKinesisException {
        Date date = instant != null ? instant.toDate() : null;
        return (String) wrapExceptions(() -> {
            return this.kinesis.getShardIterator(new GetShardIteratorRequest().withStreamName(str).withShardId(str2).withShardIteratorType(shardIteratorType).withStartingSequenceNumber(str3).withTimestamp(date)).getShardIterator();
        });
    }

    public List<Shard> listShardsAtPoint(String str, StartingPoint startingPoint) throws TransientKinesisException {
        return listShards(str, (ShardFilter) wrapExceptions(() -> {
            return buildShardFilterForStartingPoint(str, startingPoint);
        }));
    }

    private ShardFilter buildShardFilterForStartingPoint(String str, StartingPoint startingPoint) throws IOException, InterruptedException {
        InitialPositionInStream position = startingPoint.getPosition();
        switch (AnonymousClass1.$SwitchMap$com$amazonaws$services$kinesis$clientlibrary$lib$worker$InitialPositionInStream[position.ordinal()]) {
            case 1:
                return new ShardFilter().withType(ShardFilterType.AT_LATEST);
            case 2:
                return new ShardFilter().withType(ShardFilterType.AT_TRIM_HORIZON);
            case 3:
                return buildShardFilterForTimestamp(str, startingPoint.getTimestamp());
            default:
                throw new IllegalArgumentException(String.format("Unrecognized '%s' position to create shard filter with", position));
        }
    }

    private ShardFilter buildShardFilterForTimestamp(String str, Instant instant) throws IOException, InterruptedException {
        StreamDescriptionSummary describeStreamSummary = describeStreamSummary(str);
        if (new Instant(describeStreamSummary.getStreamCreationTimestamp()).isAfter(instant)) {
            return new ShardFilter().withType(ShardFilterType.AT_TRIM_HORIZON);
        }
        return instant.isAfter(this.currentInstantSupplier.get().minus(Duration.standardHours((long) describeStreamSummary.getRetentionPeriodHours().intValue())).plus(SPACING_FOR_TIMESTAMP_LIST_SHARDS_REQUEST_TO_NOT_EXCEED_TRIM_HORIZON)) ? new ShardFilter().withType(ShardFilterType.AT_TIMESTAMP).withTimestamp(instant.toDate()) : new ShardFilter().withType(ShardFilterType.AT_TRIM_HORIZON);
    }

    private StreamDescriptionSummary describeStreamSummary(String str) throws IOException, InterruptedException {
        BackOff backoff = FluentBackoff.DEFAULT.withMaxRetries(DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS).withInitialBackoff(DESCRIBE_STREAM_SUMMARY_INITIAL_BACKOFF).backoff();
        Sleeper sleeper = Sleeper.DEFAULT;
        DescribeStreamSummaryRequest describeStreamSummaryRequest = new DescribeStreamSummaryRequest();
        describeStreamSummaryRequest.setStreamName(str);
        do {
            try {
                return this.kinesis.describeStreamSummary(describeStreamSummaryRequest).getStreamDescriptionSummary();
            } catch (LimitExceededException e) {
            }
        } while (BackOffUtils.next(sleeper, backoff));
        throw e;
    }

    public List<Shard> listShardsFollowingClosedShard(String str, String str2) throws TransientKinesisException {
        return listShards(str, new ShardFilter().withType(ShardFilterType.AFTER_SHARD_ID).withShardId(str2));
    }

    private List<Shard> listShards(String str, ShardFilter shardFilter) throws TransientKinesisException {
        return (List) wrapExceptions(() -> {
            ImmutableList.Builder builder = ImmutableList.builder();
            String str2 = null;
            do {
                ListShardsRequest listShardsRequest = new ListShardsRequest();
                listShardsRequest.setMaxResults(Integer.valueOf(LIST_SHARDS_MAX_RESULTS));
                if (str2 != null) {
                    listShardsRequest.setNextToken(str2);
                } else {
                    listShardsRequest.setStreamName(str);
                }
                listShardsRequest.setShardFilter(shardFilter);
                ListShardsResult listShards = this.kinesis.listShards(listShardsRequest);
                builder.addAll(listShards.getShards());
                str2 = listShards.getNextToken();
            } while (str2 != null);
            return builder.build();
        });
    }

    public GetKinesisRecordsResult getRecords(String str, String str2, String str3) throws TransientKinesisException {
        return getRecords(str, str2, str3, this.limit);
    }

    public GetKinesisRecordsResult getRecords(String str, String str2, String str3, Integer num) throws TransientKinesisException {
        return (GetKinesisRecordsResult) wrapExceptions(() -> {
            GetRecordsResult records = this.kinesis.getRecords(new GetRecordsRequest().withShardIterator(str).withLimit(num));
            return new GetKinesisRecordsResult(UserRecord.deaggregate(records.getRecords()), records.getNextShardIterator(), records.getMillisBehindLatest().longValue(), str2, str3);
        });
    }

    public long getBacklogBytes(String str, Instant instant) throws TransientKinesisException {
        return getBacklogBytes(str, instant, new Instant());
    }

    public long getBacklogBytes(String str, Instant instant, Instant instant2) throws TransientKinesisException {
        return ((Long) wrapExceptions(() -> {
            Minutes minutesBetween = Minutes.minutesBetween(instant, instant2);
            if (minutesBetween.isLessThan(Minutes.ONE)) {
                return 0L;
            }
            long j = 0;
            Iterator it = this.cloudWatch.getMetricStatistics(createMetricStatisticsRequest(str, instant, instant2, minutesBetween)).getDatapoints().iterator();
            while (it.hasNext()) {
                j += ((Datapoint) it.next()).getSum().longValue();
            }
            return Long.valueOf(j);
        })).longValue();
    }

    GetMetricStatisticsRequest createMetricStatisticsRequest(String str, Instant instant, Instant instant2, Minutes minutes) {
        return new GetMetricStatisticsRequest().withNamespace(KINESIS_NAMESPACE).withMetricName(INCOMING_RECORDS_METRIC).withPeriod(Integer.valueOf(minutes.getMinutes() * PERIOD_GRANULARITY_IN_SECONDS)).withStartTime(instant.toDate()).withEndTime(instant2.toDate()).withStatistics(Collections.singletonList(SUM_STATISTIC)).withDimensions(Collections.singletonList(new Dimension().withName(STREAM_NAME_DIMENSION).withValue(str)));
    }

    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
        try {
            return callable.call();
        } catch (Exception e) {
            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
        } catch (AmazonServiceException e2) {
            if (e2.getErrorType() == AmazonServiceException.ErrorType.Service) {
                throw new TransientKinesisException("Kinesis backend failed. Wait some time and retry.", e2);
            }
            throw new RuntimeException("Kinesis client side failure", e2);
        } catch (ExpiredIteratorException e3) {
            throw e3;
        } catch (AmazonClientException e4) {
            if (e4.isRetryable()) {
                throw new TransientKinesisException("Retryable client failure", e4);
            }
            throw new RuntimeException("Not retryable client failure", e4);
        } catch (LimitExceededException | ProvisionedThroughputExceededException e5) {
            throw new KinesisClientThrottledException("Too many requests to Kinesis. Wait some time and retry.", e5);
        }
    }
}
