package org.apache.pekko.kafka.internal;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.pekko.kafka.ConsumerMessage;
import org.apache.pekko.kafka.ProducerMessage;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.pekko.kafka.internal.DefaultProducerStageLogic;
import org.apache.pekko.kafka.internal.DeferredProducer;
import org.apache.pekko.kafka.internal.TransactionalProducerStage;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.util.ccompat.package$JavaConverters$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: TransactionalProducerStage.scala */
/* loaded from: input_file:org/apache/pekko/kafka/internal/TransactionalProducerStageLogic.class */
public final class TransactionalProducerStageLogic<K, V, P> extends DefaultProducerStageLogic<K, V, P, ProducerMessage.Envelope<K, V, P>, ProducerMessage.Results<K, V, P>> {
    public final TransactionalProducerStage<K, V, P> org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage;
    private final String transactionalId;
    private final String commitSchedulerKey;
    private final FiniteDuration messageDrainInterval;
    private TransactionalProducerStage.TransactionBatch batchOffsets;
    private boolean demandSuspended;
    private Option<ProducerMessage.Envelope<K, V, P>> firstMessage;
    private final AsyncCallback<BoxedUnit> onInternalCommitAckCb;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public TransactionalProducerStageLogic(TransactionalProducerStage<K, V, P> transactionalProducerStage, String str, Attributes attributes) {
        super(transactionalProducerStage, attributes);
        this.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage = transactionalProducerStage;
        this.transactionalId = str;
        this.commitSchedulerKey = "commit";
        this.messageDrainInterval = new package.DurationInt(package$.MODULE$.DurationInt(10)).milliseconds();
        this.batchOffsets = TransactionalProducerStage$TransactionBatch$.MODULE$.empty();
        this.demandSuspended = false;
        this.firstMessage = None$.MODULE$;
        this.onInternalCommitAckCb = getAsyncCallback(boxedUnit -> {
            scheduleOnce(this.commitSchedulerKey, producerSettings().eosCommitInterval());
        });
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public Class<?> logSource() {
        return TransactionalProducerStage.class;
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void preStart() {
        resumeDemand(resumeDemand$default$1());
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic, org.apache.pekko.kafka.internal.DeferredProducer
    public void producerAssigned() {
        producingInHandler();
        initTransactions();
        beginTransaction();
        produceFirstMessage();
        resumeDemand(resumeDemand$default$1());
        scheduleOnce(this.commitSchedulerKey, producerSettings().eosCommitInterval());
    }

    private void produceFirstMessage() {
        Some some = this.firstMessage;
        if (!(some instanceof Some)) {
            throw new IllegalStateException("Should never attempt to produce first message if it does not exist.");
        }
        produce((ProducerMessage.Envelope) some.value());
        this.firstMessage = None$.MODULE$;
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void resumeDemand(boolean z) {
        super.resumeDemand(z);
        this.demandSuspended = false;
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public boolean resumeDemand$default$1() {
        return true;
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void suspendDemand() {
        if (!this.demandSuspended) {
            super.suspendDemand();
        }
        this.demandSuspended = true;
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void initialInHandler() {
        setHandler(this.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage.in(), new DefaultProducerStageLogic.DefaultInHandler(this) { // from class: org.apache.pekko.kafka.internal.TransactionalProducerStageLogic$$anon$1
            private final /* synthetic */ TransactionalProducerStageLogic $outer;

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(this);
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic.DefaultInHandler
            public void onPush() {
                this.$outer.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$parseFirstMessage((ProducerMessage.Envelope) this.$outer.protected$grab(this.$outer.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage.in()));
            }
        });
    }

    public void onTimer(Object obj) {
        String str = this.commitSchedulerKey;
        if (obj == null) {
            if (str != null) {
                return;
            }
        } else if (!obj.equals(str)) {
            return;
        }
        maybeCommitTransaction(maybeCommitTransaction$default$1(), maybeCommitTransaction$default$2());
    }

    private void maybeCommitTransaction(boolean z, boolean z2) {
        int awaitingConfirmationValue = awaitingConfirmationValue();
        TransactionalProducerStage.TransactionBatch transactionBatch = this.batchOffsets;
        if (transactionBatch instanceof TransactionalProducerStage.NonemptyTransactionBatch) {
            TransactionalProducerStage.NonemptyTransactionBatch nonemptyTransactionBatch = (TransactionalProducerStage.NonemptyTransactionBatch) transactionBatch;
            if (awaitingConfirmationValue == 0) {
                commitTransaction(nonemptyTransactionBatch, z);
                return;
            }
        }
        if ((transactionBatch instanceof TransactionalProducerStage.EmptyTransactionBatch) && awaitingConfirmationValue == 0 && z2) {
            abortTransaction("Transaction is empty and stage is completing");
        } else if (awaitingConfirmationValue <= 0) {
            scheduleOnce(this.commitSchedulerKey, producerSettings().eosCommitInterval());
        } else {
            suspendDemand();
            scheduleOnce(this.commitSchedulerKey, this.messageDrainInterval);
        }
    }

    private boolean maybeCommitTransaction$default$1() {
        return true;
    }

    private boolean maybeCommitTransaction$default$2() {
        return false;
    }

    public boolean org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$parseFirstMessage(ProducerMessage.Envelope<K, V, P> envelope) {
        DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle = producerAssignmentLifecycle();
        if (DeferredProducer$Assigned$.MODULE$.equals(producerAssignmentLifecycle)) {
            return true;
        }
        if (!DeferredProducer$Unassigned$.MODULE$.equals(producerAssignmentLifecycle)) {
            if (DeferredProducer$AsyncCreateRequestSent$.MODULE$.equals(producerAssignmentLifecycle)) {
                throw new IllegalStateException(new StringBuilder(71).append("Should never receive new messages while in producer assignment state '").append(DeferredProducer$AsyncCreateRequestSent$.MODULE$).append("'").toString());
            }
            throw new MatchError(producerAssignmentLifecycle);
        }
        if (this.firstMessage.nonEmpty()) {
            throw new IllegalStateException("Cannot reapply first message");
        }
        this.firstMessage = Some$.MODULE$.apply(envelope);
        resolveProducer(generatedTransactionalConfig(envelope));
        suspendDemand();
        return false;
    }

    private ProducerSettings<K, V> generatedTransactionalConfig(ProducerMessage.Envelope<K, V, P> envelope) {
        String str;
        P passThrough = envelope.passThrough();
        if (passThrough instanceof ConsumerMessage.PartitionOffsetCommittedMarker) {
            ConsumerMessage.PartitionOffsetCommittedMarker partitionOffsetCommittedMarker = (ConsumerMessage.PartitionOffsetCommittedMarker) passThrough;
            if (partitionOffsetCommittedMarker.fromPartitionedSource()) {
                ConsumerMessage.GroupTopicPartition key = partitionOffsetCommittedMarker.key();
                String sb = new StringBuilder(3).append(this.transactionalId).append("-").append(key.groupId()).append("-").append(key.topic()).append("-").append(key.partition()).toString();
                log().debug("Generated transactional id from partitioned source '{}'", sb);
                str = sb;
                return this.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage.settings().withProperties((Seq<Tuple2<String, String>>) ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("enable.idempotence"), BoxesRunTime.boxToBoolean(true).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("transactional.id"), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("max.in.flight.requests.per.connection"), BoxesRunTime.boxToInteger(1).toString())}));
            }
        }
        str = this.transactionalId;
        return this.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage.settings().withProperties((Seq<Tuple2<String, String>>) ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("enable.idempotence"), BoxesRunTime.boxToBoolean(true).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("transactional.id"), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("max.in.flight.requests.per.connection"), BoxesRunTime.boxToInteger(1).toString())}));
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void postSend(ProducerMessage.Envelope<K, V, P> envelope) {
        P passThrough = envelope.passThrough();
        if (passThrough instanceof ConsumerMessage.PartitionOffsetCommittedMarker) {
            this.batchOffsets = this.batchOffsets.updated((ConsumerMessage.PartitionOffsetCommittedMarker) passThrough);
        }
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic, org.apache.pekko.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionSuccess() {
        log().debug("Committing final transaction before shutdown");
        cancelTimer(this.commitSchedulerKey);
        maybeCommitTransaction(false, true);
        super.onCompletionSuccess();
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic, org.apache.pekko.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionFailure(Throwable th) {
        abortTransaction("Stage failure");
        this.batchOffsets.committingFailed();
        super.onCompletionFailure(th);
    }

    private void commitTransaction(TransactionalProducerStage.NonemptyTransactionBatch nonemptyTransactionBatch, boolean z) {
        String group = nonemptyTransactionBatch.group();
        log().debug("Committing transaction for transactional id '{}' consumer group '{}' with offsets: {}", this.transactionalId, group, nonemptyTransactionBatch.offsets());
        producer().sendOffsetsToTransaction(package$JavaConverters$.MODULE$.MapHasAsJava(nonemptyTransactionBatch.offsetMap()).asJava(), new ConsumerGroupMetadata(group));
        producer().commitTransaction();
        log().debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}", this.transactionalId, group, nonemptyTransactionBatch.offsets());
        this.batchOffsets = TransactionalProducerStage$TransactionBatch$.MODULE$.empty();
        nonemptyTransactionBatch.internalCommit().onComplete(r4 -> {
            this.onInternalCommitAckCb.invoke(BoxedUnit.UNIT);
        }, materializer().executionContext());
        if (z) {
            beginTransaction();
            resumeDemand(resumeDemand$default$1());
        }
    }

    private void initTransactions() {
        log().debug("Initializing transactions");
        producer().initTransactions();
    }

    private void beginTransaction() {
        log().debug("Beginning new transaction");
        producer().beginTransaction();
    }

    private void abortTransaction(String str) {
        log().debug("Aborting transaction: {}", str);
        DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle = producerAssignmentLifecycle();
        DeferredProducer$Assigned$ deferredProducer$Assigned$ = DeferredProducer$Assigned$.MODULE$;
        if (producerAssignmentLifecycle == null) {
            if (deferredProducer$Assigned$ != null) {
                return;
            }
        } else if (!producerAssignmentLifecycle.equals(deferredProducer$Assigned$)) {
            return;
        }
        producer().abortTransaction();
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public <T> T protected$grab(Inlet<T> inlet) {
        return (T) grab(inlet);
    }
}
