/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.math.BigInteger;
import java.util.List;
import java.util.ListIterator;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.cloudwatch.model.StandardUnit;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsCache;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.StreamConfig;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskType;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.lib.worker.ThrottlingReporter;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyExtended;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.Log;
import org.apache.flink.kinesis.shaded.org.apache.commons.logging.LogFactory;

class ProcessTask
implements ITask {
    private static final Log LOG = LogFactory.getLog(ProcessTask.class);
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
    private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
    private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
    private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
    private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
    private static final int MAX_CONSECUTIVE_THROTTLES = 5;
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final KinesisDataFetcher dataFetcher;
    private final TaskType taskType = TaskType.PROCESS;
    private final StreamConfig streamConfig;
    private final long backoffTimeMillis;
    private final Shard shard;
    private final ThrottlingReporter throttlingReporter;
    private final GetRecordsCache getRecordsCache;

    public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, GetRecordsCache getRecordsCache) {
        this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, new ThrottlingReporter(5, shardInfo.getShardId()), getRecordsCache);
    }

    public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) {
        this.shardInfo = shardInfo;
        this.recordProcessor = recordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.dataFetcher = dataFetcher;
        this.streamConfig = streamConfig;
        this.backoffTimeMillis = backoffTimeMillis;
        this.throttlingReporter = throttlingReporter;
        IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
        this.getRecordsCache = getRecordsCache;
        this.shard = !skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended ? ((IKinesisProxyExtended)kinesisProxy).getShard(this.shardInfo.getShardId()) : null;
        if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) {
            LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records in the event of resharding will not be dropped during deaggregation of Amazon Kinesis records.");
        }
    }

    @Override
    public TaskResult call() {
        long startTimeMillis = System.currentTimeMillis();
        IMetricsScope scope = MetricsHelper.getMetricsScope();
        scope.addDimension("ShardId", this.shardInfo.getShardId());
        scope.addData(RECORDS_PROCESSED_METRIC, 0.0, StandardUnit.Count, MetricsLevel.SUMMARY);
        scope.addData(DATA_BYTES_PROCESSED_METRIC, 0.0, StandardUnit.Bytes, MetricsLevel.SUMMARY);
        RuntimeException exception = null;
        try {
            if (this.dataFetcher.isShardEndReached()) {
                LOG.info("Reached end of shard " + this.shardInfo.getShardId() + ". Found childShards: " + this.dataFetcher.getChildShards());
                return new TaskResult(null, true, this.dataFetcher.getChildShards());
            }
            ProcessRecordsInput processRecordsInput = this.getRecordsResult();
            this.throttlingReporter.success();
            List<Record> records = processRecordsInput.getRecords();
            if (!records.isEmpty()) {
                scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
            } else {
                this.handleNoRecords(startTimeMillis);
            }
            records = this.deaggregateRecords(records);
            this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(this.filterAndGetMaxExtendedSequenceNumber(scope, records, this.recordProcessorCheckpointer.getLastCheckpointValue(), this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
            if (this.shouldCallProcessRecords(records)) {
                this.callProcessRecords(processRecordsInput, records);
            }
        }
        catch (ProvisionedThroughputExceededException pte) {
            this.throttlingReporter.throttled();
            exception = pte;
            this.backoff();
        }
        catch (RuntimeException e) {
            LOG.error("ShardId " + this.shardInfo.getShardId() + ": Caught exception: ", e);
            exception = e;
            this.backoff();
        }
        return new TaskResult(exception);
    }

    private void backoff() {
        try {
            Thread.sleep(this.backoffTimeMillis);
        }
        catch (InterruptedException ie) {
            LOG.debug(this.shardInfo.getShardId() + ": Sleep was interrupted", ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void callProcessRecords(ProcessRecordsInput input, List<Record> records) {
        LOG.debug("Calling application processRecords() with " + records.size() + " records from " + this.shardInfo.getShardId());
        ProcessRecordsInput processRecordsInput = new ProcessRecordsInput().withRecords(records).withCheckpointer(this.recordProcessorCheckpointer).withMillisBehindLatest(input.getMillisBehindLatest());
        long recordProcessorStartTimeMillis = System.currentTimeMillis();
        try {
            this.recordProcessor.processRecords(processRecordsInput);
        }
        catch (Exception e) {
            LOG.error("ShardId " + this.shardInfo.getShardId() + ": Application processRecords() threw an exception when processing shard ", e);
            LOG.error("ShardId " + this.shardInfo.getShardId() + ": Skipping over the following data records: " + records);
        }
        finally {
            MetricsHelper.addLatencyPerShard(this.shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
        }
    }

    private boolean shouldCallProcessRecords(List<Record> records) {
        return !records.isEmpty() || this.streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList();
    }

    private List<Record> deaggregateRecords(List<Record> records) {
        if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
            if (this.shard != null) {
                return UserRecord.deaggregate(records, new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()), new BigInteger(this.shard.getHashKeyRange().getEndingHashKey()));
            }
            return UserRecord.deaggregate(records);
        }
        return records;
    }

    private void handleNoRecords(long startTimeMillis) {
        LOG.debug("Kinesis didn't return any records for shard " + this.shardInfo.getShardId());
        long sleepTimeMillis = this.streamConfig.getIdleTimeInMilliseconds() - (System.currentTimeMillis() - startTimeMillis);
        if (sleepTimeMillis > 0L) {
            sleepTimeMillis = Math.max(sleepTimeMillis, this.streamConfig.getIdleTimeInMilliseconds());
            try {
                LOG.debug("Sleeping for " + sleepTimeMillis + " ms since there were no new records in shard " + this.shardInfo.getShardId());
                Thread.sleep(sleepTimeMillis);
            }
            catch (InterruptedException e) {
                LOG.debug("ShardId " + this.shardInfo.getShardId() + ": Sleep was interrupted");
            }
        }
    }

    @Override
    public TaskType getTaskType() {
        return this.taskType;
    }

    private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope scope, List<Record> records, ExtendedSequenceNumber lastCheckpointValue, ExtendedSequenceNumber lastLargestPermittedCheckpointValue) {
        ExtendedSequenceNumber largestExtendedSequenceNumber = lastLargestPermittedCheckpointValue;
        ListIterator<Record> recordIterator = records.listIterator();
        while (recordIterator.hasNext()) {
            Record record = recordIterator.next();
            ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(record.getSequenceNumber(), record instanceof UserRecord ? Long.valueOf(((UserRecord)record).getSubSequenceNumber()) : null);
            if (extendedSequenceNumber.compareTo(lastCheckpointValue) <= 0) {
                recordIterator.remove();
                LOG.debug("removing record with ESN " + extendedSequenceNumber + " because the ESN is <= checkpoint (" + lastCheckpointValue + ")");
                continue;
            }
            if (largestExtendedSequenceNumber == null || largestExtendedSequenceNumber.compareTo(extendedSequenceNumber) < 0) {
                largestExtendedSequenceNumber = extendedSequenceNumber;
            }
            scope.addData(DATA_BYTES_PROCESSED_METRIC, record.getData().limit(), StandardUnit.Bytes, MetricsLevel.SUMMARY);
        }
        return largestExtendedSequenceNumber;
    }

    private ProcessRecordsInput getRecordsResult() {
        try {
            return this.getRecordsResultAndRecordMillisBehindLatest();
        }
        catch (ExpiredIteratorException e) {
            LOG.info("ShardId " + this.shardInfo.getShardId() + ": getRecords threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", e);
            MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1.0, StandardUnit.Count, MetricsLevel.SUMMARY);
            this.dataFetcher.advanceIteratorTo(this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue().getSequenceNumber(), this.streamConfig.getInitialPositionInStream());
            try {
                return this.getRecordsResultAndRecordMillisBehindLatest();
            }
            catch (ExpiredIteratorException ex) {
                String msg = "Shard " + this.shardInfo.getShardId() + ": getRecords threw ExpiredIteratorException with a fresh iterator.";
                LOG.error(msg, ex);
                throw ex;
            }
        }
    }

    private ProcessRecordsInput getRecordsResultAndRecordMillisBehindLatest() {
        ProcessRecordsInput processRecordsInput = this.getRecordsCache.getNextResult();
        if (processRecordsInput.getMillisBehindLatest() != null) {
            MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.getMillisBehindLatest().longValue(), StandardUnit.Milliseconds, MetricsLevel.SUMMARY);
        }
        return processRecordsInput;
    }
}

