package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Minutes;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.model.Datapoint;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsRequest;
import software.amazon.awssdk.services.cloudwatch.model.Statistic;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* loaded from: input_file:org/apache/beam/sdk/io/aws2/kinesis/SimplifiedKinesisClient.class */
class SimplifiedKinesisClient {
    private static final String KINESIS_NAMESPACE = "AWS/Kinesis";
    private static final String INCOMING_RECORDS_METRIC = "IncomingBytes";
    private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
    private static final String STREAM_NAME_DIMENSION = "StreamName";
    private static final int LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS = 10;
    private static final Duration LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF = Duration.standardSeconds(1);
    private final KinesisClient kinesis;
    private final CloudWatchClient cloudWatch;
    private final Integer limit;

    public SimplifiedKinesisClient(KinesisClient kinesisClient, CloudWatchClient cloudWatchClient, Integer num) {
        this.kinesis = (KinesisClient) Preconditions.checkNotNull(kinesisClient, "kinesis");
        this.cloudWatch = (CloudWatchClient) Preconditions.checkNotNull(cloudWatchClient, "cloudWatch");
        this.limit = num;
    }

    public static SimplifiedKinesisClient from(AWSClientsProvider aWSClientsProvider, Integer num) {
        return new SimplifiedKinesisClient(aWSClientsProvider.getKinesisClient(), aWSClientsProvider.getCloudWatchClient(), num);
    }

    public String getShardIterator(String str, String str2, ShardIteratorType shardIteratorType, String str3, Instant instant) throws TransientKinesisException {
        return (String) wrapExceptions(() -> {
            return this.kinesis.getShardIterator((GetShardIteratorRequest) GetShardIteratorRequest.builder().streamName(str).shardId(str2).shardIteratorType(shardIteratorType).startingSequenceNumber(str3).timestamp(TimeUtil.toJava(instant)).build()).shardIterator();
        });
    }

    public List<Shard> listShards(String str) throws TransientKinesisException {
        return (List) wrapExceptions(() -> {
            StreamDescription streamDescription;
            ArrayList newArrayList = Lists.newArrayList();
            String str2 = null;
            FluentBackoff withInitialBackoff = FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF);
            do {
                BackOff backoff = withInitialBackoff.backoff();
                Sleeper sleeper = Sleeper.DEFAULT;
                do {
                    try {
                        streamDescription = this.kinesis.describeStream((DescribeStreamRequest) DescribeStreamRequest.builder().streamName(str).exclusiveStartShardId(str2).build()).streamDescription();
                        newArrayList.addAll(streamDescription.shards());
                        str2 = ((Shard) newArrayList.get(newArrayList.size() - 1)).shardId();
                    } catch (LimitExceededException e) {
                    }
                } while (BackOffUtils.next(sleeper, backoff));
                throw e;
            } while (streamDescription.hasMoreShards().booleanValue());
            return newArrayList;
        });
    }

    public GetKinesisRecordsResult getRecords(String str, String str2, String str3) throws TransientKinesisException {
        return getRecords(str, str2, str3, this.limit);
    }

    public GetKinesisRecordsResult getRecords(String str, String str2, String str3, Integer num) throws TransientKinesisException {
        return (GetKinesisRecordsResult) wrapExceptions(() -> {
            GetRecordsResponse records = this.kinesis.getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(str).limit(num).build());
            return new GetKinesisRecordsResult(deaggregate(records.records()), records.nextShardIterator(), records.millisBehindLatest().longValue(), str2, str3);
        });
    }

    public static List<KinesisClientRecord> deaggregate(List<Record> list) {
        return list.isEmpty() ? ImmutableList.of() : new AggregatorUtil().deaggregate((List) list.stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList()));
    }

    public long getBacklogBytes(String str, Instant instant) throws TransientKinesisException {
        return getBacklogBytes(str, instant, new Instant());
    }

    public long getBacklogBytes(String str, Instant instant, Instant instant2) throws TransientKinesisException {
        return ((Long) wrapExceptions(() -> {
            Minutes minutesBetween = Minutes.minutesBetween(instant, instant2);
            if (minutesBetween.isLessThan(Minutes.ONE)) {
                return 0L;
            }
            long j = 0;
            Iterator it = this.cloudWatch.getMetricStatistics(createMetricStatisticsRequest(str, instant, instant2, minutesBetween)).datapoints().iterator();
            while (it.hasNext()) {
                j += ((Datapoint) it.next()).sum().longValue();
            }
            return Long.valueOf(j);
        })).longValue();
    }

    GetMetricStatisticsRequest createMetricStatisticsRequest(String str, Instant instant, Instant instant2, Minutes minutes) {
        return (GetMetricStatisticsRequest) GetMetricStatisticsRequest.builder().namespace(KINESIS_NAMESPACE).metricName(INCOMING_RECORDS_METRIC).period(Integer.valueOf(minutes.getMinutes() * PERIOD_GRANULARITY_IN_SECONDS)).startTime(TimeUtil.toJava(instant)).endTime(TimeUtil.toJava(instant2)).statistics(new Statistic[]{Statistic.SUM}).dimensions(new Dimension[]{(Dimension) Dimension.builder().name(STREAM_NAME_DIMENSION).value(str).build()}).build();
    }

    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
        try {
            return callable.call();
        } catch (SdkServiceException e) {
            throw new TransientKinesisException("Kinesis backend failed. Wait some time and retry.", e);
        } catch (LimitExceededException | ProvisionedThroughputExceededException e2) {
            throw new KinesisClientThrottledException("Too many requests to Kinesis. Wait some time and retry.", e2);
        } catch (ExpiredIteratorException e3) {
            throw e3;
        } catch (Exception e4) {
            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e4);
        } catch (SdkClientException e5) {
            if (e5.retryable()) {
                throw new TransientKinesisException("Retryable client failure", e5);
            }
            throw new RuntimeException("Not retryable client failure", e5);
        }
    }
}
