package org.apache.samza.system.kinesis.consumer;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import java.util.List;
import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.class */
public class KinesisRecordProcessor implements IRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class.getName());
    static final long POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS = 1000;
    private final SystemStreamPartition ssp;
    private String shardId;
    private KinesisRecordProcessorListener listener;
    private IRecordProcessorCheckpointer checkpointer;
    private ExtendedSequenceNumber initSeqNumber;
    private volatile ExtendedSequenceNumber lastProcessedRecordSeqNumber;
    private volatile ExtendedSequenceNumber lastCheckpointedRecordSeqNumber;
    private boolean shutdownRequested = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisRecordProcessor(SystemStreamPartition systemStreamPartition, KinesisRecordProcessorListener kinesisRecordProcessorListener) {
        this.ssp = systemStreamPartition;
        this.listener = kinesisRecordProcessorListener;
    }

    public void initialize(InitializationInput initializationInput) {
        Validate.isTrue(this.listener != null, "There is no listener set for the processor.");
        this.initSeqNumber = initializationInput.getExtendedSequenceNumber();
        this.shardId = initializationInput.getShardId();
        LOG.info("Initialization done for {} with sequence {}", this, initializationInput.getExtendedSequenceNumber().getSequenceNumber());
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        Validate.isTrue(!this.shutdownRequested, String.format("KCL returned records after shutdown is called on the processor %s.", this));
        this.checkpointer = processRecordsInput.getCheckpointer();
        List<Record> records = processRecordsInput.getRecords();
        if (records.isEmpty()) {
            return;
        }
        this.lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber());
        this.listener.onReceiveRecords(this.ssp, records, processRecordsInput.getMillisBehindLatest().longValue());
    }

    public void checkpoint(String str) {
        ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(str);
        if (this.initSeqNumber.compareTo(extendedSequenceNumber) > 0) {
            LOG.warn("Samza called checkpoint with seqNumber {} smaller than initial seqNumber {} for {}. Ignoring it!", new Object[]{str, this.initSeqNumber, this});
            return;
        }
        if (this.checkpointer == null) {
            LOG.warn("Ignoring checkpointing for {} with seqNumber {} because of re-assignment.", this, str);
            return;
        }
        try {
            this.checkpointer.checkpoint(str);
            this.lastCheckpointedRecordSeqNumber = extendedSequenceNumber;
        } catch (ThrottlingException e) {
            throw new SamzaException(String.format("Checkpointing %s with seqNumber %s failed with exception. Checkpoint interval is too aggressive for the provisioned throughput of the dynamoDB table where the checkpoints are stored. Either reduce the checkpoint interval -or- increase the throughput of dynamoDB table.", this, str));
        } catch (InvalidStateException e2) {
            String format = String.format("Checkpointing %s with seqNumber %s failed with exception.", this, str);
            LOG.error(format, e2);
            throw new SamzaException(format, e2);
        } catch (ShutdownException e3) {
            LOG.warn(String.format("Checkpointing %s with seqNumber %s failed with exception. Dropping the checkpoint.", this, str), e3);
        }
    }

    public void shutdown(ShutdownInput shutdownInput) {
        LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason());
        Validate.isTrue(!this.shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this));
        this.shutdownRequested = true;
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                LOG.info("Waiting for all the records for {} to be processed.", this);
                while (this.lastProcessedRecordSeqNumber != null && !this.lastProcessedRecordSeqNumber.equals(this.lastCheckpointedRecordSeqNumber)) {
                    Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS);
                }
                LOG.info("Final checkpoint for {} before shutting down.", this);
                shutdownInput.getCheckpointer().checkpoint();
            } catch (Exception e) {
                LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e);
            }
        }
        this.listener.onShutdown(this.ssp);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getShardId() {
        return this.shardId;
    }

    public String toString() {
        return String.format("KinesisRecordProcessor: ssp %s shard %s hashCode %s", this.ssp, this.shardId, Integer.valueOf(hashCode()));
    }
}
