package org.apache.pekko.kafka.internal;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.kafka.ConsumerMessage;
import org.apache.pekko.kafka.ProducerMessage;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.pekko.kafka.internal.DeferredProducer;
import org.apache.pekko.kafka.internal.TransactionalProducerStage;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.util.ccompat.package$JavaConverters$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.util.Try;

/* compiled from: TransactionalProducerStage.scala */
@ScalaSignature(bytes = "\u0006\u0005\teb\u0001B\u0015+\rUB\u0001B\u001b\u0001\u0003\u0002\u0003\u0006Ia\u001b\u0005\t]\u0002\u0011\t\u0011)A\u0005_\"A!\u0010\u0001B\u0001B\u0003%1\u0010C\u0004\u0002\u0004\u0001!\t!!\u0002\t\u0013\u0005=\u0001A1A\u0005\n\u0005E\u0001\u0002CA\u0011\u0001\u0001\u0006I!a\u0005\t\u0013\u0005\r\u0002A1A\u0005\n\u0005\u0015\u0002\u0002CA\u001c\u0001\u0001\u0006I!a\n\t\u0013\u0005e\u0002\u00011A\u0005\n\u0005m\u0002\"CA&\u0001\u0001\u0007I\u0011BA'\u0011!\tI\u0006\u0001Q!\n\u0005u\u0002\"CA.\u0001\u0001\u0007I\u0011BA/\u0011%\t)\u0007\u0001a\u0001\n\u0013\t9\u0007\u0003\u0005\u0002l\u0001\u0001\u000b\u0015BA0\u0011%\ti\u0007\u0001a\u0001\n\u0013\ty\u0007C\u0005\u0002x\u0001\u0001\r\u0011\"\u0003\u0002z!A\u0011Q\u0010\u0001!B\u0013\t\t\bC\u0004\u0002��\u0001!\t&!!\t\u000f\u0005E\u0005\u0001\"\u0011\u0002\u0014\"9\u0011Q\u0013\u0001\u0005R\u0005M\u0005bBAL\u0001\u0011%\u00111\u0013\u0005\b\u00033\u0003A\u0011KAN\u0011%\t\t\u000bAI\u0001\n#\t\u0019\u000bC\u0004\u0002:\u0002!\t&a%\t\u000f\u0005m\u0006\u0001\"\u0015\u0002\u0014\"9\u0011Q\u0018\u0001\u0005R\u0005}\u0006bBAc\u0001\u0011%\u0011q\u0019\u0005\n\u0003#\u0004\u0011\u0013!C\u0005\u0003GC\u0011\"a5\u0001#\u0003%I!a)\t\u000f\u0005U\u0007\u0001\"\u0003\u0002X\"9\u0011Q\u001c\u0001\u0005\n\u0005}\u0007bBAv\u0001\u0011E\u0013Q\u001e\u0005\b\u0003c\u0004A\u0011IAJ\u0011\u001d\t\u0019\u0010\u0001C!\u0003kDqA!\u0004\u0001\t\u0013\u0011y\u0001C\u0005\u0003\u001e\u0001\u0011\r\u0011\"\u0003\u0003 !A!1\u0006\u0001!\u0002\u0013\u0011\t\u0003C\u0004\u0003.\u0001!I!a%\t\u000f\t=\u0002\u0001\"\u0003\u0002\u0014\"9!\u0011\u0007\u0001\u0005\n\tM\"a\b+sC:\u001c\u0018m\u0019;j_:\fG\u000e\u0015:pIV\u001cWM]*uC\u001e,Gj\\4jG*\u00111\u0006L\u0001\tS:$XM\u001d8bY*\u0011QFL\u0001\u0006W\u000647.\u0019\u0006\u0003_A\nQ\u0001]3lW>T!!\r\u001a\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0019\u0014aA8sO\u000e\u0001Q\u0003\u0002\u001c>\u00156\u001bB\u0001A\u001c_CB9\u0001(O\u001eJ\u0019>[V\"\u0001\u0016\n\u0005iR#!\u0007#fM\u0006,H\u000e\u001e)s_\u0012,8-\u001a:Ti\u0006<W\rT8hS\u000e\u0004\"\u0001P\u001f\r\u0001\u0011)a\b\u0001b\u0001\u007f\t\t1*\u0005\u0002A\rB\u0011\u0011\tR\u0007\u0002\u0005*\t1)A\u0003tG\u0006d\u0017-\u0003\u0002F\u0005\n9aj\u001c;iS:<\u0007CA!H\u0013\tA%IA\u0002B]f\u0004\"\u0001\u0010&\u0005\u000b-\u0003!\u0019A \u0003\u0003Y\u0003\"\u0001P'\u0005\u000b9\u0003!\u0019A \u0003\u0003A\u0003R\u0001\u0015-<\u00132s!!\u0015,\u000f\u0005I+fBA*U\u001b\u0005\u0001\u0014BA\u00181\u0013\tic&\u0003\u0002XY\u0005y\u0001K]8ek\u000e,'/T3tg\u0006<W-\u0003\u0002Z5\nAQI\u001c<fY>\u0004XM\u0003\u0002XYA)\u0001\u000bX\u001eJ\u0019&\u0011QL\u0017\u0002\b%\u0016\u001cX\u000f\u001c;t!\tAt,\u0003\u0002aU\tq1\u000b^1hK&#Gj\\4hS:<\u0007C\u00012h\u001d\t\u0019WM\u0004\u0002RI&\u00111\u0006L\u0005\u0003M*\nQ\u0002\u0015:pIV\u001cWM]*uC\u001e,\u0017B\u00015j\u0005]\u0001&o\u001c3vG\u0016\u00148i\\7qY\u0016$\u0018n\u001c8Ti\u0006$XM\u0003\u0002gU\u0005)1\u000f^1hKB)\u0001\b\\\u001eJ\u0019&\u0011QN\u000b\u0002\u001b)J\fgn]1di&|g.\u00197Qe>$WoY3s'R\fw-Z\u0001\u0010iJ\fgn]1di&|g.\u00197JIB\u0011\u0001o\u001e\b\u0003cV\u0004\"A\u001d\"\u000e\u0003MT!\u0001\u001e\u001b\u0002\rq\u0012xn\u001c;?\u0013\t1()\u0001\u0004Qe\u0016$WMZ\u0005\u0003qf\u0014aa\u0015;sS:<'B\u0001<C\u0003MIg\u000e[3sSR,G-\u0011;ue&\u0014W\u000f^3t!\tax0D\u0001~\u0015\tqh&\u0001\u0004tiJ,\u0017-\\\u0005\u0004\u0003\u0003i(AC!uiJL'-\u001e;fg\u00061A(\u001b8jiz\"\u0002\"a\u0002\u0002\n\u0005-\u0011Q\u0002\t\u0006q\u0001Y\u0014\n\u0014\u0005\u0006U\u0012\u0001\ra\u001b\u0005\u0006]\u0012\u0001\ra\u001c\u0005\u0006u\u0012\u0001\ra_\u0001\u0013G>lW.\u001b;TG\",G-\u001e7fe.+\u00170\u0006\u0002\u0002\u0014A!\u0011QCA\u0010\u001b\t\t9B\u0003\u0003\u0002\u001a\u0005m\u0011\u0001\u00027b]\u001eT!!!\b\u0002\t)\fg/Y\u0005\u0004q\u0006]\u0011aE2p[6LGoU2iK\u0012,H.\u001a:LKf\u0004\u0013\u0001F7fgN\fw-\u001a#sC&t\u0017J\u001c;feZ\fG.\u0006\u0002\u0002(A!\u0011\u0011FA\u001a\u001b\t\tYC\u0003\u0003\u0002.\u0005=\u0012\u0001\u00033ve\u0006$\u0018n\u001c8\u000b\u0007\u0005E\")\u0001\u0006d_:\u001cWO\u001d:f]RLA!!\u000e\u0002,\tqa)\u001b8ji\u0016$UO]1uS>t\u0017!F7fgN\fw-\u001a#sC&t\u0017J\u001c;feZ\fG\u000eI\u0001\rE\u0006$8\r[(gMN,Go]\u000b\u0003\u0003{\u0001B!a\u0010\u0002F9\u0019\u0001(!\u0011\n\u0007\u0005\r#&\u0001\u000eUe\u0006t7/Y2uS>t\u0017\r\u001c)s_\u0012,8-\u001a:Ti\u0006<W-\u0003\u0003\u0002H\u0005%#\u0001\u0005+sC:\u001c\u0018m\u0019;j_:\u0014\u0015\r^2i\u0015\r\t\u0019EK\u0001\u0011E\u0006$8\r[(gMN,Go]0%KF$B!a\u0014\u0002VA\u0019\u0011)!\u0015\n\u0007\u0005M#I\u0001\u0003V]&$\b\"CA,\u0015\u0005\u0005\t\u0019AA\u001f\u0003\rAH%M\u0001\u000eE\u0006$8\r[(gMN,Go\u001d\u0011\u0002\u001f\u0011,W.\u00198e'V\u001c\b/\u001a8eK\u0012,\"!a\u0018\u0011\u0007\u0005\u000b\t'C\u0002\u0002d\t\u0013qAQ8pY\u0016\fg.A\neK6\fg\u000eZ*vgB,g\u000eZ3e?\u0012*\u0017\u000f\u0006\u0003\u0002P\u0005%\u0004\"CA,\u001b\u0005\u0005\t\u0019AA0\u0003A!W-\\1oIN+8\u000f]3oI\u0016$\u0007%\u0001\u0007gSJ\u001cH/T3tg\u0006<W-\u0006\u0002\u0002rA!\u0011)a\u001dP\u0013\r\t)H\u0011\u0002\u0007\u001fB$\u0018n\u001c8\u0002!\u0019L'o\u001d;NKN\u001c\u0018mZ3`I\u0015\fH\u0003BA(\u0003wB\u0011\"a\u0016\u0011\u0003\u0003\u0005\r!!\u001d\u0002\u001b\u0019L'o\u001d;NKN\u001c\u0018mZ3!\u0003%awnZ*pkJ\u001cW-\u0006\u0002\u0002\u0004B\"\u0011QQAG!\u0015\u0001\u0018qQAF\u0013\r\tI)\u001f\u0002\u0006\u00072\f7o\u001d\t\u0004y\u00055EACAH%\u0005\u0005\t\u0011!B\u0001\u007f\t\u0019q\fJ\u0019\u0002\u0011A\u0014Xm\u0015;beR$\"!a\u0014\u0002!A\u0014x\u000eZ;dKJ\f5o]5h]\u0016$\u0017a\u00059s_\u0012,8-\u001a$jeN$X*Z:tC\u001e,\u0017\u0001\u0004:fgVlW\rR3nC:$G\u0003BA(\u0003;C\u0011\"a(\u0017!\u0003\u0005\r!a\u0018\u0002\u0013Q\u0014\u0018\u0010V8Qk2d\u0017A\u0006:fgVlW\rR3nC:$G\u0005Z3gCVdG\u000fJ\u0019\u0016\u0005\u0005\u0015&\u0006BA0\u0003O[#!!+\u0011\t\u0005-\u0016QW\u0007\u0003\u0003[SA!a,\u00022\u0006IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0003g\u0013\u0015AC1o]>$\u0018\r^5p]&!\u0011qWAW\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u000egV\u001c\b/\u001a8e\t\u0016l\u0017M\u001c3\u0002!%t\u0017\u000e^5bY&s\u0007*\u00198eY\u0016\u0014\u0018aB8o)&lWM\u001d\u000b\u0005\u0003\u001f\n\t\r\u0003\u0004\u0002Dj\u0001\rAR\u0001\ti&lWM]&fs\u00061R.Y=cK\u000e{W.\\5u)J\fgn]1di&|g\u000e\u0006\u0004\u0002P\u0005%\u0017Q\u001a\u0005\n\u0003\u0017\\\u0002\u0013!a\u0001\u0003?\n1CY3hS:tUm\u001e+sC:\u001c\u0018m\u0019;j_:D\u0011\"a4\u001c!\u0003\u0005\r!a\u0018\u0002?\u0005\u0014wN\u001d;F[B$\u0018\u0010\u0016:b]N\f7\r^5p]>s7i\\7qY\u0016$X-\u0001\u0011nCf\u0014WmQ8n[&$HK]1og\u0006\u001cG/[8oI\u0011,g-Y;mi\u0012\n\u0014\u0001I7bs\n,7i\\7nSR$&/\u00198tC\u000e$\u0018n\u001c8%I\u00164\u0017-\u001e7uII\n\u0011\u0003]1sg\u00164\u0015N]:u\u001b\u0016\u001c8/Y4f)\u0011\ty&!7\t\r\u0005mg\u00041\u0001P\u0003\ri7oZ\u0001\u001dO\u0016tWM]1uK\u0012$&/\u00198tC\u000e$\u0018n\u001c8bY\u000e{gNZ5h)\u0011\t\t/!;\u0011\r\u0005\r\u0018Q]\u001eJ\u001b\u0005a\u0013bAAtY\t\u0001\u0002K]8ek\u000e,'oU3ui&twm\u001d\u0005\u0007\u00037|\u0002\u0019A(\u0002\u0011A|7\u000f^*f]\u0012$B!a\u0014\u0002p\"1\u00111\u001c\u0011A\u0002=\u000b1c\u001c8D_6\u0004H.\u001a;j_:\u001cVoY2fgN\f1c\u001c8D_6\u0004H.\u001a;j_:4\u0015-\u001b7ve\u0016$B!a\u0014\u0002x\"9\u0011\u0011 \u0012A\u0002\u0005m\u0018AA3y!\u0011\tiPa\u0002\u000f\t\u0005}(1\u0001\b\u0004e\n\u0005\u0011\"A\"\n\u0007\t\u0015!)A\u0004qC\u000e\\\u0017mZ3\n\t\t%!1\u0002\u0002\n)\"\u0014xn^1cY\u0016T1A!\u0002C\u0003E\u0019w.\\7jiR\u0013\u0018M\\:bGRLwN\u001c\u000b\u0007\u0003\u001f\u0012\tBa\u0007\t\u000f\tM1\u00051\u0001\u0003\u0016\u0005)!-\u0019;dQB!\u0011q\bB\f\u0013\u0011\u0011I\"!\u0013\u000319{g.Z7qif$&/\u00198tC\u000e$\u0018n\u001c8CCR\u001c\u0007\u000eC\u0004\u0002L\u000e\u0002\r!a\u0018\u0002+=t\u0017J\u001c;fe:\fGnQ8n[&$\u0018iY6DEV\u0011!\u0011\u0005\t\u0007\u0005G\u00119#a\u0014\u000e\u0005\t\u0015\"B\u00016~\u0013\u0011\u0011IC!\n\u0003\u001b\u0005\u001b\u0018P\\2DC2d'-Y2l\u0003Yyg.\u00138uKJt\u0017\r\\\"p[6LG/Q2l\u0007\n\u0004\u0013\u0001E5oSR$&/\u00198tC\u000e$\u0018n\u001c8t\u0003A\u0011WmZ5o)J\fgn]1di&|g.\u0001\tbE>\u0014H\u000f\u0016:b]N\f7\r^5p]R!\u0011q\nB\u001b\u0011\u0019\u00119\u0004\u000ba\u0001_\u00061!/Z1t_:\u0004")
/* loaded from: input_file:org/apache/pekko/kafka/internal/TransactionalProducerStageLogic.class */
public final class TransactionalProducerStageLogic<K, V, P> extends DefaultProducerStageLogic<K, V, P, ProducerMessage.Envelope<K, V, P>, ProducerMessage.Results<K, V, P>> {
    public final TransactionalProducerStage<K, V, P> org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage;
    private final String transactionalId;
    private final String commitSchedulerKey;
    private final FiniteDuration messageDrainInterval;
    private TransactionalProducerStage.TransactionBatch batchOffsets;
    private boolean demandSuspended;
    private Option<ProducerMessage.Envelope<K, V, P>> firstMessage;
    private final AsyncCallback<BoxedUnit> onInternalCommitAckCb;

    private String commitSchedulerKey() {
        return this.commitSchedulerKey;
    }

    private FiniteDuration messageDrainInterval() {
        return this.messageDrainInterval;
    }

    private TransactionalProducerStage.TransactionBatch batchOffsets() {
        return this.batchOffsets;
    }

    private void batchOffsets_$eq(TransactionalProducerStage.TransactionBatch transactionBatch) {
        this.batchOffsets = transactionBatch;
    }

    private boolean demandSuspended() {
        return this.demandSuspended;
    }

    private void demandSuspended_$eq(boolean z) {
        this.demandSuspended = z;
    }

    private Option<ProducerMessage.Envelope<K, V, P>> firstMessage() {
        return this.firstMessage;
    }

    private void firstMessage_$eq(Option<ProducerMessage.Envelope<K, V, P>> option) {
        this.firstMessage = option;
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public Class<?> logSource() {
        return TransactionalProducerStage.class;
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void preStart() {
        resumeDemand(true);
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic, org.apache.pekko.kafka.internal.DeferredProducer
    public void producerAssigned() {
        producingInHandler();
        initTransactions();
        beginTransaction();
        produceFirstMessage();
        resumeDemand(true);
        scheduleOnce(commitSchedulerKey(), producerSettings().eosCommitInterval());
    }

    private void produceFirstMessage() {
        Some firstMessage = firstMessage();
        if (!(firstMessage instanceof Some)) {
            throw new IllegalStateException("Should never attempt to produce first message if it does not exist.");
        }
        produce((ProducerMessage.Envelope) firstMessage.value());
        firstMessage_$eq(None$.MODULE$);
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void resumeDemand(boolean z) {
        super.resumeDemand(z);
        demandSuspended_$eq(false);
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public boolean resumeDemand$default$1() {
        return true;
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void suspendDemand() {
        if (!demandSuspended()) {
            super.suspendDemand();
        }
        demandSuspended_$eq(true);
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void initialInHandler() {
        setHandler(this.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage.in(), new DefaultProducerStageLogic<K, V, P, ProducerMessage.Envelope<K, V, P>, ProducerMessage.Results<K, V, P>>.DefaultInHandler(this) { // from class: org.apache.pekko.kafka.internal.TransactionalProducerStageLogic$$anon$1
            private final /* synthetic */ TransactionalProducerStageLogic $outer;

            @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic.DefaultInHandler
            public void onPush() {
                this.$outer.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$parseFirstMessage((ProducerMessage.Envelope) this.$outer.grab(this.$outer.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage.in()));
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(this);
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
    }

    public void onTimer(Object obj) {
        String commitSchedulerKey = commitSchedulerKey();
        if (obj == null) {
            if (commitSchedulerKey != null) {
                return;
            }
        } else if (!obj.equals(commitSchedulerKey)) {
            return;
        }
        maybeCommitTransaction(true, false);
    }

    private void maybeCommitTransaction(boolean z, boolean z2) {
        int awaitingConfirmationValue = awaitingConfirmationValue();
        TransactionalProducerStage.TransactionBatch batchOffsets = batchOffsets();
        if (batchOffsets instanceof TransactionalProducerStage.NonemptyTransactionBatch) {
            TransactionalProducerStage.NonemptyTransactionBatch nonemptyTransactionBatch = (TransactionalProducerStage.NonemptyTransactionBatch) batchOffsets;
            if (awaitingConfirmationValue == 0) {
                commitTransaction(nonemptyTransactionBatch, z);
                return;
            }
        }
        if ((batchOffsets instanceof TransactionalProducerStage.EmptyTransactionBatch) && awaitingConfirmationValue == 0 && z2) {
            abortTransaction("Transaction is empty and stage is completing");
        } else if (awaitingConfirmationValue <= 0) {
            scheduleOnce(commitSchedulerKey(), producerSettings().eosCommitInterval());
        } else {
            suspendDemand();
            scheduleOnce(commitSchedulerKey(), messageDrainInterval());
        }
    }

    private boolean maybeCommitTransaction$default$1() {
        return true;
    }

    private boolean maybeCommitTransaction$default$2() {
        return false;
    }

    public boolean org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$parseFirstMessage(ProducerMessage.Envelope<K, V, P> envelope) {
        boolean z = false;
        DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle = producerAssignmentLifecycle();
        if (DeferredProducer$Assigned$.MODULE$.equals(producerAssignmentLifecycle)) {
            return true;
        }
        if (DeferredProducer$Unassigned$.MODULE$.equals(producerAssignmentLifecycle)) {
            z = true;
            if (firstMessage().nonEmpty()) {
                throw new IllegalStateException("Cannot reapply first message");
            }
        }
        if (!z) {
            if (DeferredProducer$AsyncCreateRequestSent$.MODULE$.equals(producerAssignmentLifecycle)) {
                throw new IllegalStateException(new StringBuilder(71).append("Should never receive new messages while in producer assignment state '").append(DeferredProducer$AsyncCreateRequestSent$.MODULE$).append("'").toString());
            }
            throw new MatchError(producerAssignmentLifecycle);
        }
        firstMessage_$eq(new Some(envelope));
        resolveProducer(generatedTransactionalConfig(envelope));
        suspendDemand();
        return false;
    }

    private ProducerSettings<K, V> generatedTransactionalConfig(ProducerMessage.Envelope<K, V, P> envelope) {
        String str;
        LoggingAdapter log;
        P passThrough = envelope.passThrough();
        if (passThrough instanceof ConsumerMessage.PartitionOffsetCommittedMarker) {
            ConsumerMessage.PartitionOffsetCommittedMarker partitionOffsetCommittedMarker = (ConsumerMessage.PartitionOffsetCommittedMarker) passThrough;
            if (partitionOffsetCommittedMarker.fromPartitionedSource()) {
                ConsumerMessage.GroupTopicPartition key = partitionOffsetCommittedMarker.key();
                String sb = new StringBuilder(3).append(this.transactionalId).append("-").append(key.groupId()).append("-").append(key.topic()).append("-").append(key.partition()).toString();
                log = log();
                log.debug("Generated transactional id from partitioned source '{}'", sb);
                str = sb;
                return this.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage.settings().withProperties((Seq<Tuple2<String, String>>) ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("enable.idempotence"), Boolean.toString(true)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("transactional.id"), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("max.in.flight.requests.per.connection"), Integer.toString(1))}));
            }
        }
        str = this.transactionalId;
        return this.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage.settings().withProperties((Seq<Tuple2<String, String>>) ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("enable.idempotence"), Boolean.toString(true)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("transactional.id"), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("max.in.flight.requests.per.connection"), Integer.toString(1))}));
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic
    public void postSend(ProducerMessage.Envelope<K, V, P> envelope) {
        P passThrough = envelope.passThrough();
        if (passThrough instanceof ConsumerMessage.PartitionOffsetCommittedMarker) {
            batchOffsets_$eq(batchOffsets().updated((ConsumerMessage.PartitionOffsetCommittedMarker) passThrough));
        }
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic, org.apache.pekko.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionSuccess() {
        LoggingAdapter log;
        log = log();
        log.debug("Committing final transaction before shutdown");
        cancelTimer(commitSchedulerKey());
        maybeCommitTransaction(false, true);
        super.onCompletionSuccess();
    }

    @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic, org.apache.pekko.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionFailure(Throwable th) {
        abortTransaction("Stage failure");
        batchOffsets().committingFailed();
        super.onCompletionFailure(th);
    }

    private void commitTransaction(TransactionalProducerStage.NonemptyTransactionBatch nonemptyTransactionBatch, boolean z) {
        LoggingAdapter log;
        LoggingAdapter log2;
        String group = nonemptyTransactionBatch.group();
        log = log();
        log.debug("Committing transaction for transactional id '{}' consumer group '{}' with offsets: {}", this.transactionalId, group, nonemptyTransactionBatch.offsets());
        producer().sendOffsetsToTransaction(package$JavaConverters$.MODULE$.MapHasAsJava(nonemptyTransactionBatch.offsetMap()).asJava(), new ConsumerGroupMetadata(group));
        producer().commitTransaction();
        log2 = log();
        log2.debug("Committed transaction for transactional id '{}' consumer group '{}' with offsets: {}", this.transactionalId, group, nonemptyTransactionBatch.offsets());
        TransactionalProducerStage$TransactionBatch$ transactionalProducerStage$TransactionBatch$ = TransactionalProducerStage$TransactionBatch$.MODULE$;
        batchOffsets_$eq(new TransactionalProducerStage.EmptyTransactionBatch());
        nonemptyTransactionBatch.internalCommit().onComplete(r4 -> {
            $anonfun$commitTransaction$1(this, r4);
            return BoxedUnit.UNIT;
        }, materializer().executionContext());
        if (z) {
            beginTransaction();
            resumeDemand(true);
        }
    }

    private AsyncCallback<BoxedUnit> onInternalCommitAckCb() {
        return this.onInternalCommitAckCb;
    }

    private void initTransactions() {
        LoggingAdapter log;
        log = log();
        log.debug("Initializing transactions");
        producer().initTransactions();
    }

    private void beginTransaction() {
        LoggingAdapter log;
        log = log();
        log.debug("Beginning new transaction");
        producer().beginTransaction();
    }

    private void abortTransaction(String str) {
        LoggingAdapter log;
        log = log();
        log.debug("Aborting transaction: {}", str);
        DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle = producerAssignmentLifecycle();
        DeferredProducer$Assigned$ deferredProducer$Assigned$ = DeferredProducer$Assigned$.MODULE$;
        if (producerAssignmentLifecycle != null && producerAssignmentLifecycle.equals(deferredProducer$Assigned$)) {
            producer().abortTransaction();
        }
    }

    public static final /* synthetic */ void $anonfun$commitTransaction$1(TransactionalProducerStageLogic transactionalProducerStageLogic, Try r4) {
        transactionalProducerStageLogic.onInternalCommitAckCb().invoke(BoxedUnit.UNIT);
    }

    public static final /* synthetic */ void $anonfun$onInternalCommitAckCb$1(TransactionalProducerStageLogic transactionalProducerStageLogic, BoxedUnit boxedUnit) {
        transactionalProducerStageLogic.scheduleOnce(transactionalProducerStageLogic.commitSchedulerKey(), transactionalProducerStageLogic.producerSettings().eosCommitInterval());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public TransactionalProducerStageLogic(TransactionalProducerStage<K, V, P> transactionalProducerStage, String str, Attributes attributes) {
        super(transactionalProducerStage, attributes);
        this.org$apache$pekko$kafka$internal$TransactionalProducerStageLogic$$stage = transactionalProducerStage;
        this.transactionalId = str;
        this.commitSchedulerKey = "commit";
        this.messageDrainInterval = new package.DurationInt(package$.MODULE$.DurationInt(10)).milliseconds();
        TransactionalProducerStage$TransactionBatch$ transactionalProducerStage$TransactionBatch$ = TransactionalProducerStage$TransactionBatch$.MODULE$;
        this.batchOffsets = new TransactionalProducerStage.EmptyTransactionBatch();
        this.demandSuspended = false;
        this.firstMessage = None$.MODULE$;
        this.onInternalCommitAckCb = getAsyncCallback(boxedUnit -> {
            $anonfun$onInternalCommitAckCb$1(this, boxedUnit);
            return BoxedUnit.UNIT;
        });
    }
}
