package org.apache.flink.streaming.connectors.kinesis.internals;

import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.class */
public class ShardConsumer<T> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
    private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2097152;
    private final KinesisDeserializationSchema<T> deserializer;
    private final KinesisProxyInterface kinesis;
    private final int subscribedShardStateIndex;
    private final KinesisDataFetcher<T> fetcherRef;
    private final StreamShardHandle subscribedShard;
    private int maxNumberOfRecordsPerFetch;
    private final long fetchIntervalMillis;
    private final boolean useAdaptiveReads;
    private final ShardMetricsReporter shardMetricsReporter;
    private SequenceNumber lastSequenceNum;
    private Date initTimestamp;

    public ShardConsumer(KinesisDataFetcher<T> kinesisDataFetcher, Integer num, StreamShardHandle streamShardHandle, SequenceNumber sequenceNumber, KinesisProxyInterface kinesisProxyInterface, ShardMetricsReporter shardMetricsReporter, KinesisDeserializationSchema<T> kinesisDeserializationSchema) {
        this.fetcherRef = (KinesisDataFetcher) Preconditions.checkNotNull(kinesisDataFetcher);
        this.subscribedShardStateIndex = ((Integer) Preconditions.checkNotNull(num)).intValue();
        this.subscribedShard = (StreamShardHandle) Preconditions.checkNotNull(streamShardHandle);
        this.lastSequenceNum = (SequenceNumber) Preconditions.checkNotNull(sequenceNumber);
        this.shardMetricsReporter = (ShardMetricsReporter) Preconditions.checkNotNull(shardMetricsReporter);
        Preconditions.checkArgument(!sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()), "Should not start a ShardConsumer if the shard has already been completely read.");
        this.deserializer = kinesisDeserializationSchema;
        Properties consumerConfiguration = kinesisDataFetcher.getConsumerConfiguration();
        this.kinesis = kinesisProxyInterface;
        this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfiguration.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, Integer.toString(10000))).intValue();
        this.fetchIntervalMillis = Long.valueOf(consumerConfiguration.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(200L))).longValue();
        this.useAdaptiveReads = Boolean.valueOf(consumerConfiguration.getProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, Boolean.toString(false))).booleanValue();
        if (!sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
            this.initTimestamp = null;
            return;
        }
        String property = consumerConfiguration.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
        try {
            this.initTimestamp = new SimpleDateFormat(consumerConfiguration.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT)).parse(property);
        } catch (IllegalArgumentException | NullPointerException e) {
            throw new IllegalArgumentException(e);
        } catch (ParseException e2) {
            this.initTimestamp = new Date((long) (Double.parseDouble(property) * 1000.0d));
        }
    }

    protected String getShardIterator(SequenceNumber sequenceNumber) throws Exception {
        return SentinelSequenceNumber.isSentinelSequenceNumber(sequenceNumber) ? getShardIteratorForSentinel(sequenceNumber) : getShardIteratorForRealSequenceNumber(sequenceNumber);
    }

    protected String getShardIteratorForSentinel(SequenceNumber sequenceNumber) throws InterruptedException {
        String shardIterator;
        if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
            shardIterator = this.subscribedShard.isClosed() ? null : this.kinesis.getShardIterator(this.subscribedShard, ShardIteratorType.LATEST.toString(), null);
        } else if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
            shardIterator = this.kinesis.getShardIterator(this.subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
        } else if (sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
            shardIterator = null;
        } else {
            if (!sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
                throw new RuntimeException("Unknown sentinel type: " + sequenceNumber);
            }
            shardIterator = this.kinesis.getShardIterator(this.subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), this.initTimestamp);
        }
        return shardIterator;
    }

    protected String getShardIteratorForRealSequenceNumber(SequenceNumber sequenceNumber) throws Exception {
        return sequenceNumber.isAggregated() ? getShardIteratorForAggregatedSequenceNumber(sequenceNumber) : this.kinesis.getShardIterator(this.subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber.getSequenceNumber());
    }

    protected String getShardIteratorForAggregatedSequenceNumber(SequenceNumber sequenceNumber) throws Exception {
        GetRecordsResult records = getRecords(this.kinesis.getShardIterator(this.subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), sequenceNumber.getSequenceNumber()), 1);
        List<UserRecord> deaggregateRecords = deaggregateRecords(records.getRecords(), this.subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), this.subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
        long subSequenceNumber = sequenceNumber.getSubSequenceNumber();
        for (UserRecord userRecord : deaggregateRecords) {
            if (userRecord.getSubSequenceNumber() > subSequenceNumber) {
                deserializeRecordForCollectionAndUpdateState(userRecord);
            }
        }
        return records.getNextShardIterator();
    }

    /* JADX WARN: Code restructure failed: missing block: B:14:0x0018, code lost:
    
        r9.fetcherRef.updateState(r9.subscribedShardStateIndex, org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
     */
    @Override // java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 272
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run():void");
    }

    protected long adjustRunLoopFrequency(long j, long j2) throws InterruptedException {
        long j3 = j2;
        if (this.fetchIntervalMillis != 0) {
            long j4 = this.fetchIntervalMillis - ((j2 - j) / 1000000);
            if (j4 > 0) {
                Thread.sleep(j4);
                j3 = System.nanoTime();
                this.shardMetricsReporter.setSleepTimeMillis(j4);
            }
        }
        return j3;
    }

    private int adaptRecordsToRead(long j, int i, long j2, int i2) {
        if (this.useAdaptiveReads && i != 0 && j != 0) {
            long j3 = j2 / i;
            double d = 1.0E9d / j;
            double d2 = 2097152.0d / d;
            i2 = Math.max(1, Math.min((int) (d2 / j3), 10000));
            this.shardMetricsReporter.setAverageRecordSizeBytes(j3);
            this.shardMetricsReporter.setLoopFrequencyHz(d);
            this.shardMetricsReporter.setBytesPerRead(d2);
        }
        return i2;
    }

    private boolean isRunning() {
        return !Thread.interrupted();
    }

    private void deserializeRecordForCollectionAndUpdateState(UserRecord userRecord) throws IOException {
        ByteBuffer data = userRecord.getData();
        byte[] bArr = new byte[data.remaining()];
        data.get(bArr);
        long time = userRecord.getApproximateArrivalTimestamp().getTime();
        T deserialize = this.deserializer.deserialize(bArr, userRecord.getPartitionKey(), userRecord.getSequenceNumber(), time, this.subscribedShard.getStreamName(), this.subscribedShard.getShard().getShardId());
        SequenceNumber sequenceNumber = userRecord.isAggregated() ? new SequenceNumber(userRecord.getSequenceNumber(), userRecord.getSubSequenceNumber()) : new SequenceNumber(userRecord.getSequenceNumber());
        this.fetcherRef.emitRecordAndUpdateState(deserialize, time, this.subscribedShardStateIndex, sequenceNumber);
        this.lastSequenceNum = sequenceNumber;
    }

    private GetRecordsResult getRecords(String str, int i) throws Exception {
        GetRecordsResult getRecordsResult = null;
        while (getRecordsResult == null) {
            try {
                getRecordsResult = this.kinesis.getRecords(str, i);
                Long millisBehindLatest = getRecordsResult.getMillisBehindLatest();
                if (millisBehindLatest != null) {
                    this.shardMetricsReporter.setMillisBehindLatest(millisBehindLatest.longValue());
                }
            } catch (ExpiredIteratorException e) {
                LOG.warn("Encountered an unexpected expired iterator {} for shard {}; refreshing the iterator ...", str, this.subscribedShard);
                str = getShardIterator(this.lastSequenceNum);
                if (this.fetchIntervalMillis != 0) {
                    Thread.sleep(this.fetchIntervalMillis);
                }
            }
        }
        return getRecordsResult;
    }

    protected static List<UserRecord> deaggregateRecords(List<Record> list, String str, String str2) {
        return UserRecord.deaggregate(list, new BigInteger(str), new BigInteger(str2));
    }
}
