package org.apache.pekko.stream.connectors.kinesis.impl;

import java.util.concurrent.Semaphore;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.stream.connectors.kinesis.CommittableRecord;
import org.apache.pekko.util.ccompat.package$JavaConverters$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some$;
import scala.collection.IterableOnceOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import software.amazon.kinesis.lifecycle.ShutdownReason;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* compiled from: ShardProcessor.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/impl/ShardProcessor.class */
public class ShardProcessor implements ShardRecordProcessor {
    private final Function1<CommittableRecord, BoxedUnit> processRecord;
    public CommittableRecord.ShardProcessorData org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$shardData;
    public RecordProcessorCheckpointer org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$checkpointer;
    public final Semaphore org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$lastRecordSemaphore = new Semaphore(1);
    public Option<ShutdownReason> org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$shutdown = None$.MODULE$;

    /* compiled from: ShardProcessor.scala */
    /* loaded from: input_file:org/apache/pekko/stream/connectors/kinesis/impl/ShardProcessor$InternalCommittableRecord.class */
    public final class InternalCommittableRecord extends CommittableRecord {
        private final boolean lastRecord;
        private final /* synthetic */ ShardProcessor $outer;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public InternalCommittableRecord(ShardProcessor shardProcessor, KinesisClientRecord kinesisClientRecord, CommittableRecord.BatchData batchData, boolean z) {
            super(kinesisClientRecord, batchData, shardProcessor.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$shardData);
            this.lastRecord = z;
            if (shardProcessor == null) {
                throw new NullPointerException();
            }
            this.$outer = shardProcessor;
        }

        private KinesisClientRecord record$accessor() {
            return super.record();
        }

        private CommittableRecord.BatchData batchData$accessor() {
            return super.batchData();
        }

        private void checkpoint() {
            this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$checkpointer.checkpoint(sequenceNumber(), subSequenceNumber());
            if (this.lastRecord) {
                this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$lastRecordSemaphore.release();
            }
        }

        @Override // org.apache.pekko.stream.connectors.kinesis.CommittableRecord
        public Option<ShutdownReason> shutdownReason() {
            return this.$outer.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$shutdown;
        }

        @Override // org.apache.pekko.stream.connectors.kinesis.CommittableRecord
        public void forceCheckpoint() {
            checkpoint();
        }

        public final /* synthetic */ ShardProcessor org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$InternalCommittableRecord$$$outer() {
            return this.$outer;
        }
    }

    public ShardProcessor(Function1<CommittableRecord, BoxedUnit> function1) {
        this.processRecord = function1;
    }

    public void initialize(InitializationInput initializationInput) {
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$shardData = new CommittableRecord.ShardProcessorData(initializationInput.shardId(), initializationInput.extendedSequenceNumber(), initializationInput.pendingCheckpointSequenceNumber());
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$checkpointer = processRecordsInput.checkpointer();
        CommittableRecord.BatchData batchData = new CommittableRecord.BatchData(processRecordsInput.cacheEntryTime(), processRecordsInput.cacheExitTime(), processRecordsInput.isAtShardEnd(), Predef$.MODULE$.Long2long(processRecordsInput.millisBehindLatest()));
        if (batchData.isAtShardEnd()) {
            this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$lastRecordSemaphore.acquire();
        }
        int size = processRecordsInput.records().size();
        ((IterableOnceOps) package$JavaConverters$.MODULE$.ListHasAsScala(processRecordsInput.records()).asScala().zipWithIndex()).foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            this.processRecord.apply(new InternalCommittableRecord(this, (KinesisClientRecord) tuple2._1(), batchData, processRecordsInput.isAtShardEnd() && BoxesRunTime.unboxToInt(tuple2._2()) + 1 == size));
        });
    }

    public void leaseLost(LeaseLostInput leaseLostInput) {
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$shutdown = Some$.MODULE$.apply(ShutdownReason.LEASE_LOST);
    }

    public void shardEnded(ShardEndedInput shardEndedInput) {
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$checkpointer = shardEndedInput.checkpointer();
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$shutdown = Some$.MODULE$.apply(ShutdownReason.SHARD_END);
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$lastRecordSemaphore.acquire();
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$checkpointer.checkpoint();
    }

    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$checkpointer = shutdownRequestedInput.checkpointer();
        this.org$apache$pekko$stream$connectors$kinesis$impl$ShardProcessor$$shutdown = Some$.MODULE$.apply(ShutdownReason.REQUESTED);
    }
}
