/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.aws2.kinesis;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.aws2.kinesis.AWSClientsProvider;
import org.apache.beam.sdk.io.aws2.kinesis.GetKinesisRecordsResult;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisClientThrottledException;
import org.apache.beam.sdk.io.aws2.kinesis.TimeUtil;
import org.apache.beam.sdk.io.aws2.kinesis.TransientKinesisException;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Minutes;
import org.joda.time.ReadableInstant;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.model.Datapoint;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsRequest;
import software.amazon.awssdk.services.cloudwatch.model.GetMetricStatisticsResponse;
import software.amazon.awssdk.services.cloudwatch.model.Statistic;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

class SimplifiedKinesisClient {
    private static final String KINESIS_NAMESPACE = "AWS/Kinesis";
    private static final String INCOMING_RECORDS_METRIC = "IncomingBytes";
    private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
    private static final String STREAM_NAME_DIMENSION = "StreamName";
    private static final int LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS = 10;
    private static final Duration LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF = Duration.standardSeconds((long)1L);
    private final KinesisClient kinesis;
    private final CloudWatchClient cloudWatch;
    private final Integer limit;

    public SimplifiedKinesisClient(KinesisClient kinesis, CloudWatchClient cloudWatch, Integer limit) {
        this.kinesis = (KinesisClient)Preconditions.checkNotNull((Object)kinesis, (Object)"kinesis");
        this.cloudWatch = (CloudWatchClient)Preconditions.checkNotNull((Object)cloudWatch, (Object)"cloudWatch");
        this.limit = limit;
    }

    public static SimplifiedKinesisClient from(AWSClientsProvider provider, Integer limit) {
        return new SimplifiedKinesisClient(provider.getKinesisClient(), provider.getCloudWatchClient(), limit);
    }

    public String getShardIterator(String streamName, String shardId, ShardIteratorType shardIteratorType, String startingSequenceNumber, Instant timestamp) throws TransientKinesisException {
        return this.wrapExceptions(() -> this.kinesis.getShardIterator((GetShardIteratorRequest)GetShardIteratorRequest.builder().streamName(streamName).shardId(shardId).shardIteratorType(shardIteratorType).startingSequenceNumber(startingSequenceNumber).timestamp(TimeUtil.toJava(timestamp)).build()).shardIterator());
    }

    public List<Shard> listShards(String streamName) throws TransientKinesisException {
        return this.wrapExceptions(() -> {
            ArrayList shards = Lists.newArrayList();
            String lastShardId = null;
            FluentBackoff retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF);
            StreamDescription description = null;
            do {
                BackOff backoff = retryBackoff.backoff();
                Sleeper sleeper = Sleeper.DEFAULT;
                while (true) {
                    try {
                        description = this.kinesis.describeStream((DescribeStreamRequest)DescribeStreamRequest.builder().streamName(streamName).exclusiveStartShardId(lastShardId).build()).streamDescription();
                    }
                    catch (LimitExceededException exc) {
                        if (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) continue;
                        throw exc;
                    }
                    break;
                }
                shards.addAll(description.shards());
                lastShardId = ((Shard)shards.get(shards.size() - 1)).shardId();
            } while (description.hasMoreShards().booleanValue());
            return shards;
        });
    }

    public GetKinesisRecordsResult getRecords(String shardIterator, String streamName, String shardId) throws TransientKinesisException {
        return this.getRecords(shardIterator, streamName, shardId, this.limit);
    }

    public GetKinesisRecordsResult getRecords(String shardIterator, String streamName, String shardId, Integer limit) throws TransientKinesisException {
        return this.wrapExceptions(() -> {
            GetRecordsResponse response = this.kinesis.getRecords((GetRecordsRequest)GetRecordsRequest.builder().shardIterator(shardIterator).limit(limit).build());
            List records = response.records();
            return new GetKinesisRecordsResult(SimplifiedKinesisClient.deaggregate(records), response.nextShardIterator(), response.millisBehindLatest(), streamName, shardId);
        });
    }

    public static List<KinesisClientRecord> deaggregate(List<Record> records) {
        return records.isEmpty() ? ImmutableList.of() : new AggregatorUtil().deaggregate(records.stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList()));
    }

    public long getBacklogBytes(String streamName, Instant countSince) throws TransientKinesisException {
        return this.getBacklogBytes(streamName, countSince, new Instant());
    }

    public long getBacklogBytes(String streamName, Instant countSince, Instant countTo) throws TransientKinesisException {
        return this.wrapExceptions(() -> {
            Minutes period = Minutes.minutesBetween((ReadableInstant)countSince, (ReadableInstant)countTo);
            if (period.isLessThan(Minutes.ONE)) {
                return 0L;
            }
            GetMetricStatisticsRequest request = this.createMetricStatisticsRequest(streamName, countSince, countTo, period);
            long totalSizeInBytes = 0L;
            GetMetricStatisticsResponse response = this.cloudWatch.getMetricStatistics(request);
            for (Datapoint point : response.datapoints()) {
                totalSizeInBytes += point.sum().longValue();
            }
            return totalSizeInBytes;
        });
    }

    GetMetricStatisticsRequest createMetricStatisticsRequest(String streamName, Instant countSince, Instant countTo, Minutes period) {
        return (GetMetricStatisticsRequest)GetMetricStatisticsRequest.builder().namespace(KINESIS_NAMESPACE).metricName(INCOMING_RECORDS_METRIC).period(Integer.valueOf(period.getMinutes() * 60)).startTime(TimeUtil.toJava(countSince)).endTime(TimeUtil.toJava(countTo)).statistics(new Statistic[]{Statistic.SUM}).dimensions(new Dimension[]{(Dimension)Dimension.builder().name(STREAM_NAME_DIMENSION).value(streamName).build()}).build();
    }

    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
        try {
            return callable.call();
        }
        catch (ExpiredIteratorException e) {
            throw e;
        }
        catch (LimitExceededException | ProvisionedThroughputExceededException e) {
            throw new KinesisClientThrottledException("Too many requests to Kinesis. Wait some time and retry.", (KinesisException)e);
        }
        catch (SdkServiceException e) {
            throw new TransientKinesisException("Kinesis backend failed. Wait some time and retry.", (SdkException)e);
        }
        catch (SdkClientException e) {
            if (e.retryable()) {
                throw new TransientKinesisException("Retryable client failure", (SdkException)e);
            }
            throw new RuntimeException("Not retryable client failure", e);
        }
        catch (Exception e) {
            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
        }
    }
}

