package org.apache.druid.indexing.kinesis;

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.util.AwsHostNameUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.druid.common.aws.AWSClientUtil;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;

/* loaded from: input_file:org/apache/druid/indexing/kinesis/KinesisRecordSupplier.class */
public class KinesisRecordSupplier implements RecordSupplier<String, String, ByteEntity> {
    private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class);
    private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000;
    private static final long EXCEPTION_RETRY_DELAY_MS = 10000;
    private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000;
    private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10;
    private final MethodHandle deaggregateHandle;
    private final MethodHandle getDataHandle;
    private final AmazonKinesis kinesis;
    private final int recordsPerFetch;
    private final int fetchDelayMillis;
    private final boolean deaggregate;
    private final int recordBufferOfferTimeout;
    private final int recordBufferFullWait;
    private final int maxRecordsPerPoll;
    private final int fetchThreads;
    private final int recordBufferSize;
    private final boolean useEarliestSequenceNumber;
    private final boolean useListShards;
    private ScheduledExecutorService scheduledExec;
    private BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> records;
    private final boolean backgroundFetchEnabled;
    private final ConcurrentMap<StreamPartition<String>, PartitionResource> partitionResources = new ConcurrentHashMap();
    private volatile boolean closed = false;
    private AtomicBoolean partitionsFetchStarted = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/kinesis/KinesisRecordSupplier$PartitionResource.class */
    public class PartitionResource {
        private final StreamPartition<String> streamPartition;

        @Nullable
        private volatile String shardIterator;
        private volatile long currentLagMillis;
        private final AtomicBoolean fetchStarted;
        private ScheduledFuture<?> currentFetch;

        private PartitionResource(StreamPartition<String> streamPartition) {
            this.fetchStarted = new AtomicBoolean();
            this.streamPartition = streamPartition;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startBackgroundFetch() {
            if (KinesisRecordSupplier.this.backgroundFetchEnabled) {
                if (this.shardIterator == null) {
                    KinesisRecordSupplier.log.warn("Skipping background fetch for stream[%s] partition[%s] since seek has not been called for this partition", new Object[]{this.streamPartition.getStream(), this.streamPartition.getPartitionId()});
                } else if (this.fetchStarted.compareAndSet(false, true)) {
                    KinesisRecordSupplier.log.debug("Starting scheduled fetch for stream[%s] partition[%s]", new Object[]{this.streamPartition.getStream(), this.streamPartition.getPartitionId()});
                    scheduleBackgroundFetch(KinesisRecordSupplier.this.fetchDelayMillis);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stopBackgroundFetch() {
            if (this.fetchStarted.compareAndSet(true, false)) {
                KinesisRecordSupplier.log.debug("Stopping scheduled fetch for stream[%s] partition[%s]", new Object[]{this.streamPartition.getStream(), this.streamPartition.getPartitionId()});
                if (this.currentFetch == null || this.currentFetch.isDone()) {
                    return;
                }
                this.currentFetch.cancel(true);
            }
        }

        private void scheduleBackgroundFetch(long j) {
            if (!this.fetchStarted.get()) {
                KinesisRecordSupplier.log.debug("Worker for partition[%s] is already stopped", new Object[]{this.streamPartition.getPartitionId()});
                return;
            }
            try {
                this.currentFetch = KinesisRecordSupplier.this.scheduledExec.schedule(fetchRecords(), j, TimeUnit.MILLISECONDS);
            } catch (RejectedExecutionException e) {
                KinesisRecordSupplier.log.warn(e, "Caught RejectedExecutionException, KinesisRecordSupplier for partition[%s] has likely temporarily shutdown the ExecutorService. This is expected behavior after calling seek(), seekToEarliest() and seekToLatest()", new Object[]{this.streamPartition.getPartitionId()});
            }
        }

        private Runnable fetchRecords() {
            return () -> {
                List singletonList;
                if (!this.fetchStarted.get()) {
                    KinesisRecordSupplier.log.debug("Worker for partition[%s] has been stopped", new Object[]{this.streamPartition.getPartitionId()});
                    return;
                }
                GetRecordsResult getRecordsResult = null;
                try {
                    try {
                        if (this.shardIterator == null) {
                            KinesisRecordSupplier.log.info("shardIterator[%s] has been closed and has no more records", new Object[]{this.streamPartition.getPartitionId()});
                            if (KinesisRecordSupplier.this.records.offer(new OrderedPartitionableRecord(this.streamPartition.getStream(), this.streamPartition.getPartitionId(), KinesisSequenceNumber.END_OF_SHARD_MARKER, (List) null), KinesisRecordSupplier.this.recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
                                return;
                            }
                            KinesisRecordSupplier.log.warn("Kinesis records are being processed slower than they are fetched. OrderedPartitionableRecord buffer full, retrying in [%,dms].", new Object[]{Integer.valueOf(KinesisRecordSupplier.this.recordBufferFullWait)});
                            scheduleBackgroundFetch(KinesisRecordSupplier.this.recordBufferFullWait);
                            return;
                        }
                        GetRecordsResult records = KinesisRecordSupplier.this.kinesis.getRecords(new GetRecordsRequest().withShardIterator(this.shardIterator).withLimit(Integer.valueOf(KinesisRecordSupplier.this.recordsPerFetch)));
                        this.currentLagMillis = records.getMillisBehindLatest().longValue();
                        for (Record record : records.getRecords()) {
                            if (!KinesisRecordSupplier.this.deaggregate) {
                                singletonList = Collections.singletonList(new ByteEntity(record.getData()));
                            } else {
                                if (KinesisRecordSupplier.this.deaggregateHandle == null || KinesisRecordSupplier.this.getDataHandle == null) {
                                    throw new ISE("deaggregateHandle or getDataHandle is null!", new Object[0]);
                                }
                                singletonList = new ArrayList();
                                Iterator it = (List) KinesisRecordSupplier.this.deaggregateHandle.invokeExact(Collections.singletonList(record)).iterator();
                                while (it.hasNext()) {
                                    singletonList.add(new ByteEntity((ByteBuffer) KinesisRecordSupplier.this.getDataHandle.invoke(it.next())));
                                }
                            }
                            OrderedPartitionableRecord orderedPartitionableRecord = new OrderedPartitionableRecord(this.streamPartition.getStream(), this.streamPartition.getPartitionId(), record.getSequenceNumber(), singletonList);
                            if (KinesisRecordSupplier.log.isTraceEnabled()) {
                                KinesisRecordSupplier.log.trace("Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s", new Object[]{orderedPartitionableRecord.getStream(), orderedPartitionableRecord.getPartitionId(), orderedPartitionableRecord.getSequenceNumber(), Integer.valueOf(KinesisRecordSupplier.this.records.remainingCapacity()), orderedPartitionableRecord.getData().stream().map(byteEntity -> {
                                    return StringUtils.fromUtf8(byteEntity.getBuffer().duplicate());
                                }).collect(Collectors.toList())});
                            }
                            if (!KinesisRecordSupplier.this.records.offer(orderedPartitionableRecord, KinesisRecordSupplier.this.recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
                                KinesisRecordSupplier.log.warn("Kinesis records are being processed slower than they are fetched. OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms].", new Object[]{Integer.valueOf(KinesisRecordSupplier.this.recordBufferFullWait)});
                                this.shardIterator = KinesisRecordSupplier.this.kinesis.getShardIterator(orderedPartitionableRecord.getStream(), (String) orderedPartitionableRecord.getPartitionId(), ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), (String) orderedPartitionableRecord.getSequenceNumber()).getShardIterator();
                                scheduleBackgroundFetch(KinesisRecordSupplier.this.recordBufferFullWait);
                                return;
                            }
                        }
                        this.shardIterator = records.getNextShardIterator();
                        scheduleBackgroundFetch(KinesisRecordSupplier.this.fetchDelayMillis);
                    } catch (ResourceNotFoundException | InvalidArgumentException e) {
                        KinesisRecordSupplier.log.error(e, "encounted AWS error while attempting to fetch records, will not retry", new Object[0]);
                        throw e;
                    }
                } catch (ExpiredIteratorException e2) {
                    KinesisRecordSupplier.log.warn(e2, "ShardIterator expired while trying to fetch records, retrying in [%,dms]", new Object[]{Integer.valueOf(KinesisRecordSupplier.this.fetchDelayMillis)});
                    if (0 == 0) {
                        throw new ISE("can't reschedule fetch records runnable, recordsResult is null??", new Object[0]);
                    }
                    this.shardIterator = getRecordsResult.getNextShardIterator();
                    scheduleBackgroundFetch(KinesisRecordSupplier.this.fetchDelayMillis);
                } catch (ProvisionedThroughputExceededException e3) {
                    KinesisRecordSupplier.log.warn(e3, "encounted ProvisionedThroughputExceededException while fetching records, this means that the request rate for the stream is too high, or the requested data is too large for the available throughput. Reduce the frequency or size of your requests.", new Object[0]);
                    scheduleBackgroundFetch(Math.max(KinesisRecordSupplier.PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS, KinesisRecordSupplier.this.fetchDelayMillis));
                } catch (AmazonClientException e4) {
                    if (!AWSClientUtil.isClientExceptionRecoverable(e4)) {
                        KinesisRecordSupplier.log.warn(e4, "encounted unknown unrecoverable AWS exception, will not retry", new Object[0]);
                        throw new RuntimeException((Throwable) e4);
                    }
                    KinesisRecordSupplier.log.warn(e4, "encounted unknown recoverable AWS exception, retrying in [%,dms]", new Object[]{Long.valueOf(KinesisRecordSupplier.EXCEPTION_RETRY_DELAY_MS)});
                    scheduleBackgroundFetch(KinesisRecordSupplier.EXCEPTION_RETRY_DELAY_MS);
                } catch (InterruptedException e5) {
                    KinesisRecordSupplier.log.warn(e5, "Interrupted while waiting to add record to buffer, retrying in [%,dms]", new Object[]{Long.valueOf(KinesisRecordSupplier.EXCEPTION_RETRY_DELAY_MS)});
                    scheduleBackgroundFetch(KinesisRecordSupplier.EXCEPTION_RETRY_DELAY_MS);
                } catch (Throwable th) {
                    KinesisRecordSupplier.log.error(th, "unknown fetchRecords exception, will not retry", new Object[0]);
                    throw new RuntimeException(th);
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void seek(ShardIteratorType shardIteratorType, String str) {
            EmittingLogger emittingLogger = KinesisRecordSupplier.log;
            Object[] objArr = new Object[2];
            objArr[0] = this.streamPartition.getPartitionId();
            objArr[1] = str != null ? str : shardIteratorType.toString();
            emittingLogger.debug("Seeking partition [%s] to [%s]", objArr);
            this.shardIterator = (String) KinesisRecordSupplier.wrapExceptions(() -> {
                return KinesisRecordSupplier.this.kinesis.getShardIterator(this.streamPartition.getStream(), (String) this.streamPartition.getPartitionId(), shardIteratorType.toString(), str).getShardIterator();
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getPartitionTimeLag() {
            return this.currentLagMillis;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> T wrapExceptions(Callable<T> callable) {
        try {
            return callable.call();
        } catch (Exception e) {
            throw new StreamException(e);
        }
    }

    public KinesisRecordSupplier(AmazonKinesis amazonKinesis, int i, int i2, int i3, boolean z, int i4, int i5, int i6, int i7, boolean z2, boolean z3) {
        Preconditions.checkNotNull(amazonKinesis);
        this.kinesis = amazonKinesis;
        this.recordsPerFetch = i;
        this.fetchDelayMillis = i2;
        this.deaggregate = z;
        this.recordBufferOfferTimeout = i5;
        this.recordBufferFullWait = i6;
        this.maxRecordsPerPoll = i7;
        this.fetchThreads = i3;
        this.recordBufferSize = i4;
        this.useEarliestSequenceNumber = z2;
        this.useListShards = z3;
        this.backgroundFetchEnabled = i3 > 0;
        if (z) {
            try {
                Class<?> cls = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
                MethodHandles.Lookup publicLookup = MethodHandles.publicLookup();
                Method method = cls.getMethod("deaggregate", List.class);
                Method method2 = cls.getMethod("getData", new Class[0]);
                this.deaggregateHandle = publicLookup.unreflect(method);
                this.getDataHandle = publicLookup.unreflect(method2);
            } catch (ClassNotFoundException e) {
                throw new ISE(e, "cannot find class[com.amazonaws.services.kinesis.clientlibrary.types.UserRecord], note that when using deaggregate=true, you must provide the Kinesis Client Library jar in the classpath", new Object[0]);
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        } else {
            this.deaggregateHandle = null;
            this.getDataHandle = null;
        }
        if (this.backgroundFetchEnabled) {
            log.info("Creating fetch thread pool of size [%d] (Runtime.availableProcessors=%d)", new Object[]{Integer.valueOf(i3), Integer.valueOf(Runtime.getRuntime().availableProcessors())});
            this.scheduledExec = Executors.newScheduledThreadPool(i3, Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d"));
        }
        this.records = new LinkedBlockingQueue(i4);
    }

    public static AmazonKinesis getAmazonKinesisClient(String str, AWSCredentialsConfig aWSCredentialsConfig, String str2, String str3) {
        AWSCredentialsProvider defaultAWSCredentialsProviderChain = AWSCredentialsUtils.defaultAWSCredentialsProviderChain(aWSCredentialsConfig);
        if (str2 != null) {
            log.info("Assuming role [%s] with externalId [%s]", new Object[]{str2, str3});
            STSAssumeRoleSessionCredentialsProvider.Builder withStsClient = new STSAssumeRoleSessionCredentialsProvider.Builder(str2, StringUtils.format("druid-kinesis-%s", new Object[]{UUID.randomUUID().toString()})).withStsClient((AWSSecurityTokenService) AWSSecurityTokenServiceClientBuilder.standard().withCredentials(defaultAWSCredentialsProviderChain).build());
            if (str3 != null) {
                withStsClient.withExternalId(str3);
            }
            defaultAWSCredentialsProviderChain = withStsClient.build();
        }
        return (AmazonKinesis) AmazonKinesisClientBuilder.standard().withCredentials(defaultAWSCredentialsProviderChain).withClientConfiguration(new ClientConfiguration()).withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(str, AwsHostNameUtils.parseRegion(str, (String) null))).build();
    }

    @VisibleForTesting
    public void start() {
        checkIfClosed();
        if (this.backgroundFetchEnabled && this.partitionsFetchStarted.compareAndSet(false, true)) {
            this.partitionResources.values().forEach(obj -> {
                ((PartitionResource) obj).startBackgroundFetch();
            });
        }
    }

    public void close() {
        if (this.closed) {
            return;
        }
        assign(ImmutableSet.of());
        if (this.scheduledExec != null) {
            this.scheduledExec.shutdown();
            try {
                if (!this.scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) {
                    this.scheduledExec.shutdownNow();
                }
            } catch (InterruptedException e) {
                log.warn(e, "InterruptedException while shutting down", new Object[0]);
                throw new RuntimeException(e);
            }
        }
        this.closed = true;
    }

    public void assign(Set<StreamPartition<String>> set) {
        checkIfClosed();
        set.forEach(streamPartition -> {
            this.partitionResources.putIfAbsent(streamPartition, new PartitionResource(streamPartition));
        });
        Iterator<Map.Entry<StreamPartition<String>, PartitionResource>> it = this.partitionResources.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<StreamPartition<String>, PartitionResource> next = it.next();
            if (!set.contains(next.getKey())) {
                it.remove();
                next.getValue().stopBackgroundFetch();
            }
        }
    }

    public Collection<StreamPartition<String>> getAssignment() {
        return this.partitionResources.keySet();
    }

    public void seek(StreamPartition<String> streamPartition, String str) throws InterruptedException {
        filterBufferAndResetBackgroundFetch(ImmutableSet.of(streamPartition));
        if (KinesisSequenceNumber.UNREAD_TRIM_HORIZON.equals(str)) {
            partitionSeek(streamPartition, null, ShardIteratorType.TRIM_HORIZON);
        } else if (KinesisSequenceNumber.UNREAD_LATEST.equals(str)) {
            partitionSeek(streamPartition, null, ShardIteratorType.LATEST);
        } else {
            partitionSeek(streamPartition, str, ShardIteratorType.AT_SEQUENCE_NUMBER);
        }
    }

    public void seekToEarliest(Set<StreamPartition<String>> set) throws InterruptedException {
        filterBufferAndResetBackgroundFetch(set);
        set.forEach(streamPartition -> {
            partitionSeek(streamPartition, null, ShardIteratorType.TRIM_HORIZON);
        });
    }

    public void seekToLatest(Set<StreamPartition<String>> set) throws InterruptedException {
        filterBufferAndResetBackgroundFetch(set);
        set.forEach(streamPartition -> {
            partitionSeek(streamPartition, null, ShardIteratorType.LATEST);
        });
    }

    @Nullable
    public String getPosition(StreamPartition<String> streamPartition) {
        throw new UnsupportedOperationException("getPosition() is not supported in Kinesis");
    }

    @Nonnull
    public List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long j) {
        start();
        try {
            int min = Math.min(Math.max(this.records.size(), 1), this.maxRecordsPerPoll);
            ArrayList arrayList = new ArrayList(min);
            Queues.drain(this.records, arrayList, min, j, TimeUnit.MILLISECONDS);
            return (List) arrayList.stream().filter(orderedPartitionableRecord -> {
                return this.partitionResources.containsKey(orderedPartitionableRecord.getStreamPartition());
            }).collect(Collectors.toList());
        } catch (InterruptedException e) {
            log.warn(e, "Interrupted while polling", new Object[0]);
            return Collections.emptyList();
        }
    }

    @Nullable
    public String getLatestSequenceNumber(StreamPartition<String> streamPartition) {
        return getSequenceNumber(streamPartition, ShardIteratorType.LATEST);
    }

    @Nullable
    public String getEarliestSequenceNumber(StreamPartition<String> streamPartition) {
        return getSequenceNumber(streamPartition, ShardIteratorType.TRIM_HORIZON);
    }

    public boolean isOffsetAvailable(StreamPartition<String> streamPartition, OrderedSequenceNumber<String> orderedSequenceNumber) {
        return ((Boolean) wrapExceptions(() -> {
            KinesisSequenceNumber kinesisSequenceNumber = (KinesisSequenceNumber) orderedSequenceNumber;
            if (kinesisSequenceNumber.isUnread()) {
                return true;
            }
            if (!KinesisSequenceNumber.isValidAWSKinesisSequence((String) kinesisSequenceNumber.get())) {
                return false;
            }
            GetRecordsRequest withShardIterator = new GetRecordsRequest().withShardIterator((String) RetryUtils.retry(() -> {
                return this.kinesis.getShardIterator(streamPartition.getStream(), (String) streamPartition.getPartitionId(), ShardIteratorType.AT_SEQUENCE_NUMBER.name(), (String) kinesisSequenceNumber.get()).getShardIterator();
            }, th -> {
                if (th instanceof ProvisionedThroughputExceededException) {
                    log.warn(th, "encountered ProvisionedThroughputExceededException while fetching records, this means that the request rate for the stream is too high, or the requested data is too large for the available throughput. Reduce the frequency or size of your requests. Consider increasing the number of shards to increase throughput.", new Object[0]);
                    return true;
                }
                if (th instanceof AmazonClientException) {
                    return AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) th);
                }
                return false;
            }, GET_SEQUENCE_NUMBER_RETRY_COUNT));
            List list = (List) RetryUtils.retry(() -> {
                return this.kinesis.getRecords(withShardIterator).getRecords();
            }, th2 -> {
                if (th2 instanceof ProvisionedThroughputExceededException) {
                    log.warn(th2, "encountered ProvisionedThroughputExceededException while fetching records, this means that the request rate for the stream is too high, or the requested data is too large for the available throughput. Reduce the frequency or size of your requests. Consider increasing the number of shards to increase throughput.", new Object[0]);
                    return true;
                }
                if (th2 instanceof AmazonClientException) {
                    return AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) th2);
                }
                return false;
            }, GET_SEQUENCE_NUMBER_RETRY_COUNT);
            return Boolean.valueOf(!list.isEmpty() && ((Record) list.get(0)).getSequenceNumber().equals(kinesisSequenceNumber.get()));
        })).booleanValue();
    }

    private Set<Shard> getShards(String str) {
        return this.useListShards ? getShardsUsingListShards(str) : getShardsUsingDescribeStream(str);
    }

    private Set<Shard> getShardsUsingDescribeStream(String str) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(str);
        while (describeStreamRequest != null) {
            StreamDescription streamDescription = this.kinesis.describeStream(describeStreamRequest).getStreamDescription();
            List shards = streamDescription.getShards();
            builder.addAll(shards);
            if (streamDescription.isHasMoreShards().booleanValue()) {
                describeStreamRequest.setExclusiveStartShardId(((Shard) Iterables.getLast(shards)).getShardId());
            } else {
                describeStreamRequest = null;
            }
        }
        return builder.build();
    }

    private Set<Shard> getShardsUsingListShards(String str) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        ListShardsRequest withStreamName = new ListShardsRequest().withStreamName(str);
        while (true) {
            ListShardsResult listShards = this.kinesis.listShards(withStreamName);
            builder.addAll(listShards.getShards());
            String nextToken = listShards.getNextToken();
            if (nextToken == null) {
                return builder.build();
            }
            withStreamName = new ListShardsRequest().withNextToken(nextToken);
        }
    }

    public Set<String> getPartitionIds(String str) {
        return (Set) wrapExceptions(() -> {
            TreeSet treeSet = new TreeSet();
            Iterator<Shard> it = getShards(str).iterator();
            while (it.hasNext()) {
                treeSet.add(it.next().getShardId());
            }
            return treeSet;
        });
    }

    public Map<String, Long> getPartitionsTimeLag(String str, Map<String, String> map) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        for (Map.Entry<String, String> entry : map.entrySet()) {
            StreamPartition<String> streamPartition = new StreamPartition<>(str, entry.getKey());
            long j = 0;
            if (KinesisSequenceNumber.isValidAWSKinesisSequence(entry.getValue())) {
                j = getPartitionTimeLag(streamPartition, entry.getValue()).longValue();
            }
            newHashMapWithExpectedSize.put(entry.getKey(), Long.valueOf(j));
        }
        return newHashMapWithExpectedSize;
    }

    @VisibleForTesting
    Map<String, Long> getPartitionResourcesTimeLag() {
        return (Map) this.partitionResources.entrySet().stream().collect(Collectors.toMap(entry -> {
            return (String) ((StreamPartition) entry.getKey()).getPartitionId();
        }, entry2 -> {
            return Long.valueOf(((PartitionResource) entry2.getValue()).getPartitionTimeLag());
        }));
    }

    @VisibleForTesting
    public int bufferSize() {
        return this.records.size();
    }

    @VisibleForTesting
    public boolean isBackgroundFetchRunning() {
        return this.partitionsFetchStarted.get();
    }

    @VisibleForTesting
    public boolean isAnyFetchActive() {
        return this.partitionResources.values().stream().map(partitionResource -> {
            return partitionResource.currentFetch;
        }).anyMatch(scheduledFuture -> {
            return (scheduledFuture == null || scheduledFuture.isDone()) ? false : true;
        });
    }

    private void partitionSeek(StreamPartition<String> streamPartition, String str, ShardIteratorType shardIteratorType) {
        PartitionResource partitionResource = this.partitionResources.get(streamPartition);
        if (partitionResource == null) {
            throw new ISE("Partition [%s] has not been assigned", new Object[]{streamPartition});
        }
        partitionResource.seek(shardIteratorType, str);
    }

    @Nullable
    private String getSequenceNumber(StreamPartition<String> streamPartition, ShardIteratorType shardIteratorType) {
        return (String) wrapExceptions(() -> {
            String shardIterator = this.kinesis.getShardIterator(streamPartition.getStream(), (String) streamPartition.getPartitionId(), shardIteratorType.toString()).getShardIterator();
            if (this.closed) {
                log.info("KinesisRecordSupplier closed while fetching sequenceNumber", new Object[0]);
                return null;
            }
            GetRecordsRequest withLimit = new GetRecordsRequest().withShardIterator(shardIterator).withLimit(Integer.valueOf(GET_SEQUENCE_NUMBER_RECORD_COUNT));
            GetRecordsResult getRecordsResult = (GetRecordsResult) RetryUtils.retry(() -> {
                return this.kinesis.getRecords(withLimit);
            }, th -> {
                if (th instanceof ProvisionedThroughputExceededException) {
                    log.warn(th, "encountered ProvisionedThroughputExceededException while fetching records, this means that the request rate for the stream is too high, or the requested data is too large for the available throughput. Reduce the frequency or size of your requests. Consider increasing the number of shards to increase throughput.", new Object[0]);
                    return true;
                }
                if (th instanceof AmazonClientException) {
                    return AWSClientUtil.isClientExceptionRecoverable((AmazonClientException) th);
                }
                return false;
            }, GET_SEQUENCE_NUMBER_RETRY_COUNT);
            List records = getRecordsResult.getRecords();
            if (!records.isEmpty()) {
                return ((Record) records.get(0)).getSequenceNumber();
            }
            if (getRecordsResult.getNextShardIterator() == null) {
                log.info("Partition[%s] is closed and empty", new Object[]{streamPartition.getPartitionId()});
                return KinesisSequenceNumber.END_OF_SHARD_MARKER;
            }
            if (shardIteratorType.equals(ShardIteratorType.LATEST)) {
                log.info("Partition[%s] has no records at LATEST offset", new Object[]{streamPartition.getPartitionId()});
                return KinesisSequenceNumber.UNREAD_LATEST;
            }
            if (shardIteratorType.equals(ShardIteratorType.TRIM_HORIZON)) {
                log.info("Partition[%s] has no records at TRIM_HORIZON offset", new Object[]{streamPartition.getPartitionId()});
                return KinesisSequenceNumber.UNREAD_TRIM_HORIZON;
            }
            log.warn("Could not fetch sequence number for Partition[%s]", new Object[]{streamPartition.getPartitionId()});
            return null;
        });
    }

    private Long getPartitionTimeLag(StreamPartition<String> streamPartition, String str) {
        return (Long) wrapExceptions(() -> {
            String shardIteratorType;
            String str2;
            if (str != null && !KinesisSupervisor.OFFSET_NOT_SET.equals(str)) {
                shardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
                str2 = str;
            } else {
                if (!this.useEarliestSequenceNumber) {
                    return 0L;
                }
                shardIteratorType = ShardIteratorType.TRIM_HORIZON.toString();
                str2 = null;
            }
            if (getRecordsForLag(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), str2, streamPartition).getRecords().size() == 0) {
                return 0L;
            }
            return getRecordsForLag(shardIteratorType, str2, streamPartition).getMillisBehindLatest();
        });
    }

    private GetRecordsResult getRecordsForLag(String str, String str2, StreamPartition<String> streamPartition) {
        return this.kinesis.getRecords(new GetRecordsRequest().withShardIterator(this.kinesis.getShardIterator(streamPartition.getStream(), (String) streamPartition.getPartitionId(), str, str2).getShardIterator()).withLimit(1));
    }

    private void checkIfClosed() {
        if (this.closed) {
            throw new ISE("Invalid operation - KinesisRecordSupplier has already been closed", new Object[0]);
        }
    }

    private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> set) throws InterruptedException {
        checkIfClosed();
        if (this.backgroundFetchEnabled && this.partitionsFetchStarted.compareAndSet(true, false)) {
            this.scheduledExec.shutdown();
            try {
                if (!this.scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) {
                    this.scheduledExec.shutdownNow();
                }
                this.scheduledExec = Executors.newScheduledThreadPool(this.fetchThreads, Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d"));
            } catch (InterruptedException e) {
                log.warn(e, "InterruptedException while shutting down", new Object[0]);
                throw e;
            }
        }
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.recordBufferSize);
        Stream filter = this.records.stream().filter(orderedPartitionableRecord -> {
            return !set.contains(orderedPartitionableRecord.getStreamPartition());
        });
        linkedBlockingQueue.getClass();
        filter.forEachOrdered((v1) -> {
            r1.offer(v1);
        });
        this.records = linkedBlockingQueue;
        this.partitionResources.values().forEach(partitionResource -> {
            partitionResource.stopBackgroundFetch();
        });
    }

    @Nullable
    /* renamed from: getPosition, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m8getPosition(StreamPartition streamPartition) {
        return getPosition((StreamPartition<String>) streamPartition);
    }

    @Nullable
    /* renamed from: getEarliestSequenceNumber, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m9getEarliestSequenceNumber(StreamPartition streamPartition) {
        return getEarliestSequenceNumber((StreamPartition<String>) streamPartition);
    }

    @Nullable
    /* renamed from: getLatestSequenceNumber, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m10getLatestSequenceNumber(StreamPartition streamPartition) {
        return getLatestSequenceNumber((StreamPartition<String>) streamPartition);
    }

    public /* bridge */ /* synthetic */ void seek(StreamPartition streamPartition, Object obj) throws InterruptedException {
        seek((StreamPartition<String>) streamPartition, (String) obj);
    }
}
