/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.proxies;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyExtended;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.InvalidArgumentException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.LimitExceededException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ListShardsRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ListShardsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordRequest;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceInUseException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.StreamStatus;
import org.apache.flink.kinesis.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.Log;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.LogFactory;

public class KinesisProxy
implements IKinesisProxyExtended {
    private static final Log LOG = LogFactory.getLog(KinesisProxy.class);
    private static final EnumSet<ShardIteratorType> EXPECTED_ITERATOR_TYPES = EnumSet.of(ShardIteratorType.AT_SEQUENCE_NUMBER, ShardIteratorType.AFTER_SEQUENCE_NUMBER);
    public static final int MAX_CACHE_MISSES_BEFORE_RELOAD = 1000;
    public static final Duration CACHE_MAX_ALLOWED_AGE = Duration.of(30L, ChronoUnit.SECONDS);
    public static final int CACHE_MISS_WARNING_MODULUS = 250;
    private static String defaultServiceName = "kinesis";
    private static String defaultRegionId = "us-east-1";
    private AmazonKinesis client;
    private AWSCredentialsProvider credentialsProvider;
    private ShardIterationState shardIterationState = null;
    private volatile Map<String, Shard> cachedShardMap = null;
    private volatile Instant lastCacheUpdateTime = null;
    private AtomicInteger cacheMisses = new AtomicInteger(0);
    private final String streamName;
    private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L;
    private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
    private final long describeStreamBackoffTimeInMillis;
    private final int maxDescribeStreamRetryAttempts;
    private final long listShardsBackoffTimeInMillis;
    private final int maxListShardsRetryAttempts;
    private boolean isKinesisClient = true;

    @Deprecated
    private static AmazonKinesisClient buildClientSettingEndpoint(AWSCredentialsProvider credentialProvider, String endpoint, String serviceName, String regionId) {
        AmazonKinesisClient client = new AmazonKinesisClient(credentialProvider);
        client.setEndpoint(endpoint);
        client.setSignerRegionOverride(regionId);
        return client;
    }

    @Deprecated
    public KinesisProxy(String streamName, AWSCredentialsProvider credentialProvider, String endpoint) {
        this(streamName, credentialProvider, endpoint, defaultServiceName, defaultRegionId, 1000L, 50, 1500L, 50);
    }

    @Deprecated
    public KinesisProxy(String streamName, AWSCredentialsProvider credentialProvider, String endpoint, String serviceName, String regionId, long describeStreamBackoffTimeInMillis, int maxDescribeStreamRetryAttempts, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts) {
        this(streamName, credentialProvider, KinesisProxy.buildClientSettingEndpoint(credentialProvider, endpoint, serviceName, regionId), describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts);
        LOG.debug("KinesisProxy has created a kinesisClient");
    }

    @Deprecated
    public KinesisProxy(String streamName, AWSCredentialsProvider credentialProvider, AmazonKinesis kinesisClient, long describeStreamBackoffTimeInMillis, int maxDescribeStreamRetryAttempts, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts) {
        this(streamName, kinesisClient, describeStreamBackoffTimeInMillis, maxDescribeStreamRetryAttempts, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts);
        this.credentialsProvider = credentialProvider;
        LOG.debug("KinesisProxy( " + streamName + ")");
    }

    public KinesisProxy(KinesisClientLibConfiguration config, AmazonKinesis client) {
        this(config.getStreamName(), client, 1000L, 50, config.getListShardsBackoffTimeInMillis(), config.getMaxListShardsRetryAttempts());
        this.credentialsProvider = config.getKinesisCredentialsProvider();
    }

    public KinesisProxy(String streamName, AmazonKinesis client, long describeStreamBackoffTimeInMillis, int maxDescribeStreamRetryAttempts, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts) {
        this.streamName = streamName;
        this.client = client;
        this.describeStreamBackoffTimeInMillis = describeStreamBackoffTimeInMillis;
        this.maxDescribeStreamRetryAttempts = maxDescribeStreamRetryAttempts;
        this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
        this.maxListShardsRetryAttempts = maxListShardsRetryAttempts;
        try {
            if (Class.forName("org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient").isAssignableFrom(client.getClass())) {
                this.isKinesisClient = false;
                LOG.debug("Client is DynamoDb client, will use DescribeStream.");
            }
        }
        catch (ClassNotFoundException e) {
            LOG.debug("Client is Kinesis Client, using ListShards instead of DescribeStream.");
        }
    }

    @Override
    public GetRecordsResult get(String shardIterator, int maxRecords) throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        getRecordsRequest.setShardIterator(shardIterator);
        getRecordsRequest.setLimit(maxRecords);
        GetRecordsResult response = this.client.getRecords(getRecordsRequest);
        return response;
    }

    @Override
    @Deprecated
    public DescribeStreamResult getStreamInfo(String startShardId) throws ResourceNotFoundException, LimitExceededException {
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        describeStreamRequest.setStreamName(this.streamName);
        describeStreamRequest.setExclusiveStartShardId(startShardId);
        DescribeStreamResult response = null;
        LimitExceededException lastException = null;
        int remainingRetryTimes = this.maxDescribeStreamRetryAttempts;
        while (response == null) {
            try {
                response = this.client.describeStream(describeStreamRequest);
            }
            catch (LimitExceededException le) {
                LOG.info("Got LimitExceededException when describing stream " + this.streamName + ". Backing off for " + this.describeStreamBackoffTimeInMillis + " millis.");
                try {
                    Thread.sleep(this.describeStreamBackoffTimeInMillis);
                }
                catch (InterruptedException ie) {
                    LOG.debug("Stream " + this.streamName + " : Sleep  was interrupted ", ie);
                }
                lastException = le;
            }
            if (--remainingRetryTimes > 0 || response != null) continue;
            if (lastException != null) {
                throw lastException;
            }
            throw new IllegalStateException("Received null from DescribeStream call.");
        }
        if (StreamStatus.ACTIVE.toString().equals(response.getStreamDescription().getStreamStatus()) || StreamStatus.UPDATING.toString().equals(response.getStreamDescription().getStreamStatus())) {
            return response;
        }
        LOG.info("Stream is in status " + response.getStreamDescription().getStreamStatus() + ", KinesisProxy.DescribeStream returning null (wait until stream is Active or Updating");
        return null;
    }

    private ListShardsResult listShards(String nextToken) {
        ListShardsRequest request = new ListShardsRequest();
        request.setRequestCredentials(this.credentialsProvider.getCredentials());
        if (StringUtils.isEmpty(nextToken)) {
            request.setStreamName(this.streamName);
        } else {
            request.setNextToken(nextToken);
        }
        ListShardsResult result = null;
        LimitExceededException lastException = null;
        int remainingRetries = this.maxListShardsRetryAttempts;
        while (result == null) {
            try {
                result = this.client.listShards(request);
            }
            catch (LimitExceededException e) {
                LOG.info("Got LimitExceededException when listing shards " + this.streamName + ". Backing off for " + this.listShardsBackoffTimeInMillis + " millis.");
                try {
                    Thread.sleep(this.listShardsBackoffTimeInMillis);
                }
                catch (InterruptedException ie) {
                    LOG.debug("Stream " + this.streamName + " : Sleep  was interrupted ", ie);
                }
                lastException = e;
            }
            catch (ResourceInUseException e) {
                LOG.info("Stream is not in Active/Updating status, returning null (wait until stream is in Active or Updating)");
                return null;
            }
            if (--remainingRetries > 0 || result != null) continue;
            if (lastException != null) {
                throw lastException;
            }
            throw new IllegalStateException("Received null from ListShards call.");
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Shard getShard(String shardId) {
        Shard shard;
        if (this.cachedShardMap == null) {
            KinesisProxy kinesisProxy = this;
            synchronized (kinesisProxy) {
                if (this.cachedShardMap == null) {
                    this.getShardList();
                }
            }
        }
        if ((shard = this.cachedShardMap.get(shardId)) == null && (this.cacheMisses.incrementAndGet() > 1000 || this.cacheNeedsTimeUpdate())) {
            KinesisProxy kinesisProxy = this;
            synchronized (kinesisProxy) {
                shard = this.cachedShardMap.get(shardId);
                if (shard == null) {
                    LOG.info("To many shard map cache misses or cache is out of date -- forcing a refresh");
                    this.getShardList();
                    shard = this.verifyAndLogShardAfterCacheUpdate(shardId);
                    this.cacheMisses.set(0);
                } else {
                    this.cacheMisses.set(0);
                }
            }
        }
        if (shard == null) {
            String message = "Cannot find the shard given the shardId " + shardId + ".  Cache misses: " + this.cacheMisses;
            if (this.cacheMisses.get() % 250 == 0) {
                LOG.warn(message);
            } else {
                LOG.debug(message);
            }
        }
        return shard;
    }

    private Shard verifyAndLogShardAfterCacheUpdate(String shardId) {
        Shard shard = this.cachedShardMap.get(shardId);
        if (shard == null) {
            LOG.warn("Even after cache refresh shard '" + shardId + "' wasn't found.  This could indicate a bigger problem");
        }
        return shard;
    }

    private boolean cacheNeedsTimeUpdate() {
        if (this.lastCacheUpdateTime == null) {
            return true;
        }
        Instant now = Instant.now();
        Duration cacheAge = Duration.between(this.lastCacheUpdateTime, now);
        String baseMessage = "Shard map cache is " + cacheAge + " > " + CACHE_MAX_ALLOWED_AGE + ". ";
        if (cacheAge.compareTo(CACHE_MAX_ALLOWED_AGE) > 0) {
            LOG.info(baseMessage + "Age exceeds limit -- Refreshing.");
            return true;
        }
        LOG.debug(baseMessage + "Age doesn't exceed limit.");
        return false;
    }

    @Override
    public synchronized List<Shard> getShardList() {
        if (this.shardIterationState == null) {
            this.shardIterationState = new ShardIterationState();
        }
        if (this.isKinesisClient) {
            ListShardsResult result;
            String nextToken = null;
            do {
                if ((result = this.listShards(nextToken)) == null) {
                    return null;
                }
                this.shardIterationState.update(result.getShards());
                nextToken = result.getNextToken();
            } while (StringUtils.isNotEmpty(result.getNextToken()));
        } else {
            DescribeStreamResult response;
            do {
                if ((response = this.getStreamInfo(this.shardIterationState.getLastShardId())) == null) {
                    return null;
                }
                this.shardIterationState.update(response.getStreamDescription().getShards());
            } while (response.getStreamDescription().isHasMoreShards().booleanValue());
        }
        List<Shard> shards = this.shardIterationState.getShards();
        this.cachedShardMap = shards.stream().collect(Collectors.toMap(Shard::getShardId, Function.identity()));
        this.lastCacheUpdateTime = Instant.now();
        this.shardIterationState = new ShardIterationState();
        return shards;
    }

    @Override
    public Set<String> getAllShardIds() throws ResourceNotFoundException {
        List<Shard> shards = this.getShardList();
        if (shards == null) {
            return null;
        }
        HashSet<String> shardIds = new HashSet<String>();
        for (Shard shard : this.getShardList()) {
            shardIds.add(shard.getShardId());
        }
        return shardIds;
    }

    @Override
    public String getIterator(String shardId, String iteratorType, String sequenceNumber) {
        ShardIteratorType shardIteratorType;
        try {
            shardIteratorType = ShardIteratorType.fromValue(iteratorType);
        }
        catch (IllegalArgumentException iae) {
            LOG.error("Caught illegal argument exception while parsing iteratorType: " + iteratorType, iae);
            shardIteratorType = null;
        }
        if (!EXPECTED_ITERATOR_TYPES.contains((Object)shardIteratorType)) {
            LOG.info("This method should only be used for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER ShardIteratorTypes. For methods to use with other ShardIteratorTypes, see IKinesisProxy.java");
        }
        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        getShardIteratorRequest.setStreamName(this.streamName);
        getShardIteratorRequest.setShardId(shardId);
        getShardIteratorRequest.setShardIteratorType(iteratorType);
        getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
        getShardIteratorRequest.setTimestamp(null);
        GetShardIteratorResult response = this.client.getShardIterator(getShardIteratorRequest);
        return response.getShardIterator();
    }

    @Override
    public String getIterator(String shardId, String iteratorType) {
        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        getShardIteratorRequest.setStreamName(this.streamName);
        getShardIteratorRequest.setShardId(shardId);
        getShardIteratorRequest.setShardIteratorType(iteratorType);
        getShardIteratorRequest.setStartingSequenceNumber(null);
        getShardIteratorRequest.setTimestamp(null);
        GetShardIteratorResult response = this.client.getShardIterator(getShardIteratorRequest);
        return response.getShardIterator();
    }

    @Override
    public String getIterator(String shardId, Date timestamp) {
        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        getShardIteratorRequest.setStreamName(this.streamName);
        getShardIteratorRequest.setShardId(shardId);
        getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP);
        getShardIteratorRequest.setStartingSequenceNumber(null);
        getShardIteratorRequest.setTimestamp(timestamp);
        GetShardIteratorResult response = this.client.getShardIterator(getShardIteratorRequest);
        return response.getShardIterator();
    }

    @Override
    public PutRecordResult put(String exclusiveMinimumSequenceNumber, String explicitHashKey, String partitionKey, ByteBuffer data) throws ResourceNotFoundException, InvalidArgumentException {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setRequestCredentials(this.credentialsProvider.getCredentials());
        putRecordRequest.setStreamName(this.streamName);
        putRecordRequest.setSequenceNumberForOrdering(exclusiveMinimumSequenceNumber);
        putRecordRequest.setExplicitHashKey(explicitHashKey);
        putRecordRequest.setPartitionKey(partitionKey);
        putRecordRequest.setData(data);
        PutRecordResult response = this.client.putRecord(putRecordRequest);
        return response;
    }

    void setCachedShardMap(Map<String, Shard> cachedShardMap) {
        this.cachedShardMap = cachedShardMap;
    }

    void setLastCacheUpdateTime(Instant lastCacheUpdateTime) {
        this.lastCacheUpdateTime = lastCacheUpdateTime;
    }

    Instant getLastCacheUpdateTime() {
        return this.lastCacheUpdateTime;
    }

    void setCacheMisses(AtomicInteger cacheMisses) {
        this.cacheMisses = cacheMisses;
    }

    AtomicInteger getCacheMisses() {
        return this.cacheMisses;
    }

    static class ShardIterationState {
        private List<Shard> shards = new ArrayList<Shard>();
        private String lastShardId;

        public void update(List<Shard> shards) {
            if (shards == null || shards.isEmpty()) {
                return;
            }
            this.shards.addAll(shards);
            Shard lastShard = shards.get(shards.size() - 1);
            if (this.lastShardId == null || this.lastShardId.compareTo(lastShard.getShardId()) < 0) {
                this.lastShardId = lastShard.getShardId();
            }
        }

        public List<Shard> getShards() {
            return this.shards;
        }

        public String getLastShardId() {
            return this.lastShardId;
        }

        public void setShards(List<Shard> shards) {
            this.shards = shards;
        }

        public void setLastShardId(String lastShardId) {
            this.lastShardId = lastShardId;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ShardIterationState)) {
                return false;
            }
            ShardIterationState other = (ShardIterationState)o;
            if (!other.canEqual(this)) {
                return false;
            }
            List<Shard> this$shards = this.getShards();
            List<Shard> other$shards = other.getShards();
            if (this$shards == null ? other$shards != null : !((Object)this$shards).equals(other$shards)) {
                return false;
            }
            String this$lastShardId = this.getLastShardId();
            String other$lastShardId = other.getLastShardId();
            return !(this$lastShardId == null ? other$lastShardId != null : !this$lastShardId.equals(other$lastShardId));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ShardIterationState;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            List<Shard> $shards = this.getShards();
            result = result * 59 + ($shards == null ? 43 : ((Object)$shards).hashCode());
            String $lastShardId = this.getLastShardId();
            result = result * 59 + ($lastShardId == null ? 43 : $lastShardId.hashCode());
            return result;
        }

        public String toString() {
            return "KinesisProxy.ShardIterationState(shards=" + this.getShards() + ", lastShardId=" + this.getLastShardId() + ")";
        }
    }
}

