package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;

import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.class */
public class PollingRecordPublisher implements RecordPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);
    private final PollingRecordPublisherMetricsReporter metricsReporter;
    private final KinesisProxyInterface kinesisProxy;
    private final StreamShardHandle subscribedShard;
    private String nextShardItr;
    private StartingPosition nextStartingPosition;
    private final int maxNumberOfRecordsPerFetch;
    private final long expiredIteratorBackoffMillis;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PollingRecordPublisher(StartingPosition startingPosition, StreamShardHandle streamShardHandle, PollingRecordPublisherMetricsReporter pollingRecordPublisherMetricsReporter, KinesisProxyInterface kinesisProxyInterface, int i, long j) throws InterruptedException {
        this.nextStartingPosition = (StartingPosition) Preconditions.checkNotNull(startingPosition);
        this.subscribedShard = (StreamShardHandle) Preconditions.checkNotNull(streamShardHandle);
        this.metricsReporter = (PollingRecordPublisherMetricsReporter) Preconditions.checkNotNull(pollingRecordPublisherMetricsReporter);
        this.kinesisProxy = (KinesisProxyInterface) Preconditions.checkNotNull(kinesisProxyInterface);
        this.maxNumberOfRecordsPerFetch = i;
        this.expiredIteratorBackoffMillis = j;
        Preconditions.checkArgument(j >= 0);
        Preconditions.checkArgument(i > 0);
        this.nextShardItr = getShardIterator();
    }

    @Override // org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher
    public RecordPublisher.RecordPublisherRunResult run(RecordPublisher.RecordBatchConsumer recordBatchConsumer) throws InterruptedException {
        return run(recordBatchConsumer, this.maxNumberOfRecordsPerFetch);
    }

    public RecordPublisher.RecordPublisherRunResult run(RecordPublisher.RecordBatchConsumer recordBatchConsumer, int i) throws InterruptedException {
        if (this.nextShardItr == null) {
            return RecordPublisher.RecordPublisherRunResult.COMPLETE;
        }
        this.metricsReporter.setMaxNumberOfRecordsPerFetch(i);
        GetRecordsResult records = getRecords(this.nextShardItr, i);
        this.nextStartingPosition = getNextStartingPosition(recordBatchConsumer.accept(new RecordBatch(records.getRecords(), this.subscribedShard, records.getMillisBehindLatest())));
        this.nextShardItr = records.getNextShardIterator();
        return this.nextShardItr == null ? RecordPublisher.RecordPublisherRunResult.COMPLETE : RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
    }

    private StartingPosition getNextStartingPosition(SequenceNumber sequenceNumber) {
        if (!SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
            return StartingPosition.continueFromSequenceNumber(sequenceNumber);
        }
        Preconditions.checkState(this.nextStartingPosition.getShardIteratorType() == ShardIteratorType.AT_TIMESTAMP);
        return this.nextStartingPosition;
    }

    private GetRecordsResult getRecords(String str, int i) throws InterruptedException {
        GetRecordsResult getRecordsResult = null;
        while (getRecordsResult == null) {
            try {
                getRecordsResult = this.kinesisProxy.getRecords(str, i);
            } catch (InterruptedException | ExpiredIteratorException e) {
                LOG.warn("Encountered an unexpected expired iterator {} for shard {}; refreshing the iterator ...", str, this.subscribedShard);
                str = getShardIterator();
                if (this.expiredIteratorBackoffMillis != 0) {
                    Thread.sleep(this.expiredIteratorBackoffMillis);
                }
            }
        }
        return getRecordsResult;
    }

    @Nullable
    private String getShardIterator() throws InterruptedException {
        return this.kinesisProxy.getShardIterator(this.subscribedShard, this.nextStartingPosition.getShardIteratorType().toString(), this.nextStartingPosition.getStartingMarker());
    }
}
