/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardConsumerMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ShardConsumer<T>
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
    private final KinesisDeserializationSchema<T> deserializer;
    private final int subscribedShardStateIndex;
    private final KinesisDataFetcher<T> fetcherRef;
    private final StreamShardHandle subscribedShard;
    private final ShardConsumerMetricsReporter shardConsumerMetricsReporter;
    private SequenceNumber lastSequenceNum;
    private final RecordPublisher recordPublisher;

    public ShardConsumer(KinesisDataFetcher<T> fetcherRef, RecordPublisher recordPublisher, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, ShardConsumerMetricsReporter shardConsumerMetricsReporter, KinesisDeserializationSchema<T> shardDeserializer) {
        this.fetcherRef = (KinesisDataFetcher)Preconditions.checkNotNull(fetcherRef);
        this.recordPublisher = (RecordPublisher)Preconditions.checkNotNull((Object)recordPublisher);
        this.subscribedShardStateIndex = (Integer)Preconditions.checkNotNull((Object)subscribedShardStateIndex);
        this.subscribedShard = (StreamShardHandle)Preconditions.checkNotNull((Object)subscribedShard);
        this.shardConsumerMetricsReporter = (ShardConsumerMetricsReporter)Preconditions.checkNotNull((Object)shardConsumerMetricsReporter);
        this.lastSequenceNum = (SequenceNumber)Preconditions.checkNotNull((Object)lastSequenceNum);
        Preconditions.checkArgument((!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()) ? 1 : 0) != 0, (Object)"Should not start a ShardConsumer if the shard has already been completely read.");
        this.deserializer = shardDeserializer;
    }

    @Override
    public void run() {
        try {
            while (this.isRunning()) {
                RecordPublisher.RecordPublisherRunResult result = this.recordPublisher.run(batch -> {
                    if (!batch.getDeaggregatedRecords().isEmpty()) {
                        LOG.debug("stream: {}, shard: {}, millis behind latest: {}, batch size: {}", new Object[]{this.subscribedShard.getStreamName(), this.subscribedShard.getShard().getShardId(), batch.getMillisBehindLatest(), batch.getDeaggregatedRecordSize()});
                    }
                    for (UserRecord userRecord : batch.getDeaggregatedRecords()) {
                        if (!this.filterDeaggregatedRecord(userRecord)) continue;
                        this.deserializeRecordForCollectionAndUpdateState(userRecord);
                    }
                    this.shardConsumerMetricsReporter.setAverageRecordSizeBytes(batch.getAverageRecordSizeBytes());
                    this.shardConsumerMetricsReporter.setNumberOfAggregatedRecords(batch.getAggregatedRecordSize());
                    this.shardConsumerMetricsReporter.setNumberOfDeaggregatedRecords(batch.getDeaggregatedRecordSize());
                    Optional.ofNullable(batch.getMillisBehindLatest()).ifPresent(this.shardConsumerMetricsReporter::setMillisBehindLatest);
                    return this.lastSequenceNum;
                });
                if (result == RecordPublisher.RecordPublisherRunResult.COMPLETE) {
                    this.fetcherRef.updateState(this.subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
                    break;
                }
                if (result != RecordPublisher.RecordPublisherRunResult.CANCELLED) continue;
                String errorMessage = "Shard consumer cancelled: " + this.subscribedShard.getShard().getShardId();
                LOG.info(errorMessage);
                throw new ShardConsumerCancelledException(errorMessage);
            }
        }
        catch (Throwable t) {
            this.fetcherRef.stopWithError(t);
        }
    }

    private boolean isRunning() {
        return !Thread.interrupted() && this.fetcherRef.isRunning();
    }

    private void deserializeRecordForCollectionAndUpdateState(UserRecord record) {
        T value;
        ByteBuffer recordData = record.getData();
        byte[] dataBytes = new byte[recordData.remaining()];
        recordData.get(dataBytes);
        long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();
        try {
            value = this.deserializer.deserialize(dataBytes, record.getPartitionKey(), record.getSequenceNumber(), approxArrivalTimestamp, this.subscribedShard.getStreamName(), this.subscribedShard.getShard().getShardId());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        SequenceNumber collectedSequenceNumber = record.isAggregated() ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) : new SequenceNumber(record.getSequenceNumber());
        this.fetcherRef.emitRecordAndUpdateState(value, approxArrivalTimestamp, this.subscribedShardStateIndex, collectedSequenceNumber);
        this.lastSequenceNum = collectedSequenceNumber;
    }

    private boolean filterDeaggregatedRecord(UserRecord record) {
        if (!this.lastSequenceNum.isAggregated()) {
            return true;
        }
        return !record.getSequenceNumber().equals(this.lastSequenceNum.getSequenceNumber()) || record.getSubSequenceNumber() > this.lastSequenceNum.getSubSequenceNumber();
    }

    static class ShardConsumerCancelledException
    extends ShardConsumerException {
        private static final long serialVersionUID = 2707399313569728649L;

        public ShardConsumerCancelledException(String message) {
            super(message);
        }
    }

    static abstract class ShardConsumerException
    extends RuntimeException {
        private static final long serialVersionUID = 7732343624482321663L;

        public ShardConsumerException(String message) {
            super(message);
        }
    }
}

