/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling;

import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.metrics.PollingRecordPublisherMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PollingRecordPublisher
implements RecordPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(PollingRecordPublisher.class);
    private final PollingRecordPublisherMetricsReporter metricsReporter;
    private final KinesisProxyInterface kinesisProxy;
    private final StreamShardHandle subscribedShard;
    private String nextShardItr;
    private StartingPosition nextStartingPosition;
    private final int maxNumberOfRecordsPerFetch;
    private final long fetchIntervalMillis;
    private long processingStartTimeNanos = System.nanoTime();

    PollingRecordPublisher(StartingPosition startingPosition, StreamShardHandle subscribedShard, PollingRecordPublisherMetricsReporter metricsReporter, KinesisProxyInterface kinesisProxy, int maxNumberOfRecordsPerFetch, long fetchIntervalMillis) throws InterruptedException {
        this.nextStartingPosition = (StartingPosition)Preconditions.checkNotNull((Object)startingPosition);
        this.subscribedShard = (StreamShardHandle)Preconditions.checkNotNull((Object)subscribedShard);
        this.metricsReporter = (PollingRecordPublisherMetricsReporter)Preconditions.checkNotNull((Object)metricsReporter);
        this.kinesisProxy = (KinesisProxyInterface)Preconditions.checkNotNull((Object)kinesisProxy);
        this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
        this.fetchIntervalMillis = fetchIntervalMillis;
        Preconditions.checkArgument((fetchIntervalMillis >= 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((maxNumberOfRecordsPerFetch > 0 ? 1 : 0) != 0);
        this.nextShardItr = this.getShardIterator();
    }

    @Override
    public RecordPublisher.RecordPublisherRunResult run(RecordPublisher.RecordBatchConsumer consumer) throws InterruptedException {
        return this.run(consumer, this.maxNumberOfRecordsPerFetch);
    }

    public RecordPublisher.RecordPublisherRunResult run(RecordPublisher.RecordBatchConsumer consumer, int maxNumberOfRecords) throws InterruptedException {
        if (this.nextShardItr == null) {
            return RecordPublisher.RecordPublisherRunResult.COMPLETE;
        }
        this.metricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecords);
        GetRecordsResult result = this.getRecords(this.nextShardItr, maxNumberOfRecords);
        RecordBatch recordBatch = new RecordBatch(result.getRecords(), this.subscribedShard, result.getMillisBehindLatest());
        SequenceNumber latestSequenceNumber = consumer.accept(recordBatch);
        this.nextStartingPosition = this.getNextStartingPosition(latestSequenceNumber);
        this.nextShardItr = result.getNextShardIterator();
        long adjustmentEndTimeNanos = this.adjustRunLoopFrequency(this.processingStartTimeNanos, System.nanoTime());
        long runLoopTimeNanos = adjustmentEndTimeNanos - this.processingStartTimeNanos;
        this.processingStartTimeNanos = adjustmentEndTimeNanos;
        this.metricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
        return this.nextShardItr == null ? RecordPublisher.RecordPublisherRunResult.COMPLETE : RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
    }

    private StartingPosition getNextStartingPosition(SequenceNumber latestSequenceNumber) {
        if (SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(latestSequenceNumber)) {
            Preconditions.checkState((this.nextStartingPosition.getShardIteratorType() == ShardIteratorType.AT_TIMESTAMP ? 1 : 0) != 0);
            return this.nextStartingPosition;
        }
        return StartingPosition.continueFromSequenceNumber(latestSequenceNumber);
    }

    private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
        GetRecordsResult getRecordsResult = null;
        while (getRecordsResult == null) {
            try {
                getRecordsResult = this.kinesisProxy.getRecords(shardItr, maxNumberOfRecords);
            }
            catch (InterruptedException | ExpiredIteratorException eiEx) {
                LOG.warn("Encountered an unexpected expired iterator {} for shard {}; refreshing the iterator ...", (Object)shardItr, (Object)this.subscribedShard);
                shardItr = this.getShardIterator();
                if (this.fetchIntervalMillis == 0L) continue;
                Thread.sleep(this.fetchIntervalMillis);
            }
        }
        return getRecordsResult;
    }

    @Nullable
    private String getShardIterator() throws InterruptedException {
        return this.kinesisProxy.getShardIterator(this.subscribedShard, this.nextStartingPosition.getShardIteratorType().toString(), this.nextStartingPosition.getStartingMarker());
    }

    private long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) throws InterruptedException {
        long processingTimeNanos;
        long sleepTimeMillis;
        long endTimeNanos = processingEndTimeNanos;
        if (this.fetchIntervalMillis != 0L && (sleepTimeMillis = this.fetchIntervalMillis - (processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos) / 1000000L) > 0L) {
            Thread.sleep(sleepTimeMillis);
            endTimeNanos = System.nanoTime();
            this.metricsReporter.setSleepTimeMillis(sleepTimeMillis);
        }
        return endTimeNanos;
    }
}

