package org.apache.flink.streaming.connectors.kinesis.internals;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.class */
public class ShardConsumer<T> implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
    private final KinesisDeserializationSchema<T> deserializer;
    private final int subscribedShardStateIndex;
    private final KinesisDataFetcher<T> fetcherRef;
    private final StreamShardHandle subscribedShard;
    private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
    private SequenceNumber lastSequenceNum;
    private final RecordPublisher recordPublisher;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer$ShardConsumerCancelledException.class */
    static class ShardConsumerCancelledException extends ShardConsumerException {
        private static final long serialVersionUID = 2707399313569728649L;

        public ShardConsumerCancelledException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer$ShardConsumerException.class */
    static abstract class ShardConsumerException extends RuntimeException {
        private static final long serialVersionUID = 7732343624482321663L;

        public ShardConsumerException(String str) {
            super(str);
        }
    }

    public ShardConsumer(KinesisDataFetcher<T> kinesisDataFetcher, RecordPublisher recordPublisher, Integer num, StreamShardHandle streamShardHandle, SequenceNumber sequenceNumber, ShardConsumerMetricsReporter shardConsumerMetricsReporter, KinesisDeserializationSchema<T> kinesisDeserializationSchema) {
        this.fetcherRef = (KinesisDataFetcher) Preconditions.checkNotNull(kinesisDataFetcher);
        this.recordPublisher = (RecordPublisher) Preconditions.checkNotNull(recordPublisher);
        this.subscribedShardStateIndex = ((Integer) Preconditions.checkNotNull(num)).intValue();
        this.subscribedShard = (StreamShardHandle) Preconditions.checkNotNull(streamShardHandle);
        this.shardConsumerMetricsReporter = (ShardConsumerMetricsReporter) Preconditions.checkNotNull(shardConsumerMetricsReporter);
        this.lastSequenceNum = (SequenceNumber) Preconditions.checkNotNull(sequenceNumber);
        Preconditions.checkArgument(!sequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()), "Should not start a ShardConsumer if the shard has already been completely read.");
        this.deserializer = kinesisDeserializationSchema;
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x0099, code lost:
    
        return;
     */
    @Override // java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            r4 = this;
        L0:
            r0 = r4
            boolean r0 = r0.isRunning()     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            if (r0 == 0) goto L72
            r0 = r4
            org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher r0 = r0.recordPublisher     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r1 = r4
            void r1 = (v1) -> { // org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordBatchConsumer.accept(org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch):org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
                return r1.lambda$run$0(v1);
            }     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher$RecordPublisherRunResult r0 = r0.run(r1)     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r5 = r0
            r0 = r5
            org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher$RecordPublisherRunResult r1 = org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            if (r0 != r1) goto L32
            r0 = r4
            org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher<T> r0 = r0.fetcherRef     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r1 = r4
            int r1 = r1.subscribedShardStateIndex     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber r2 = org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber r2 = r2.get()     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r0.updateState(r1, r2)     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            goto L72
        L32:
            r0 = r4
            boolean r0 = r0.isRunning()     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            if (r0 == 0) goto L6f
            r0 = r5
            org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher$RecordPublisherRunResult r1 = org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.CANCELLED     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            if (r0 != r1) goto L6f
            java.lang.StringBuilder r0 = new java.lang.StringBuilder     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r1 = r0
            r1.<init>()     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            java.lang.String r1 = "Shard consumer cancelled: "
            java.lang.StringBuilder r0 = r0.append(r1)     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r1 = r4
            org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle r1 = r1.subscribedShard     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard r1 = r1.getShard()     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            java.lang.String r1 = r1.getShardId()     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            java.lang.StringBuilder r0 = r0.append(r1)     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            java.lang.String r0 = r0.toString()     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r6 = r0
            org.slf4j.Logger r0 = org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.LOG     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r1 = r6
            r0.info(r1)     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer$ShardConsumerCancelledException r0 = new org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer$ShardConsumerCancelledException     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            r1 = r0
            r2 = r6
            r1.<init>(r2)     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
            throw r0     // Catch: java.lang.Throwable -> L7c java.lang.Throwable -> L8f
        L6f:
            goto L0
        L72:
            r0 = r4
            org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter r0 = r0.shardConsumerMetricsReporter
            r0.unregister()
            goto L99
        L7c:
            r5 = move-exception
            r0 = r4
            org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher<T> r0 = r0.fetcherRef     // Catch: java.lang.Throwable -> L8f
            r1 = r5
            r0.stopWithError(r1)     // Catch: java.lang.Throwable -> L8f
            r0 = r4
            org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter r0 = r0.shardConsumerMetricsReporter
            r0.unregister()
            goto L99
        L8f:
            r7 = move-exception
            r0 = r4
            org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter r0 = r0.shardConsumerMetricsReporter
            r0.unregister()
            r0 = r7
            throw r0
        L99:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run():void");
    }

    private boolean isRunning() {
        return !Thread.interrupted() && this.fetcherRef.isRunning();
    }

    private void deserializeRecordForCollectionAndUpdateState(UserRecord userRecord) {
        ByteBuffer data = userRecord.getData();
        byte[] bArr = new byte[data.remaining()];
        data.get(bArr);
        long time = userRecord.getApproximateArrivalTimestamp().getTime();
        try {
            T deserialize = this.deserializer.deserialize(bArr, userRecord.getPartitionKey(), userRecord.getSequenceNumber(), time, this.subscribedShard.getStreamName(), this.subscribedShard.getShard().getShardId());
            SequenceNumber sequenceNumber = userRecord.isAggregated() ? new SequenceNumber(userRecord.getSequenceNumber(), userRecord.getSubSequenceNumber()) : new SequenceNumber(userRecord.getSequenceNumber());
            this.fetcherRef.emitRecordAndUpdateState(deserialize, time, this.subscribedShardStateIndex, sequenceNumber);
            this.lastSequenceNum = sequenceNumber;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean filterDeaggregatedRecord(UserRecord userRecord) {
        return (this.lastSequenceNum.isAggregated() && userRecord.getSequenceNumber().equals(this.lastSequenceNum.getSequenceNumber()) && userRecord.getSubSequenceNumber() <= this.lastSequenceNum.getSubSequenceNumber()) ? false : true;
    }
}
