package akka.kafka.internal;

import akka.kafka.ConsumerMessage;
import akka.kafka.ProducerMessage;
import akka.kafka.internal.TransactionalProducerStage;
import akka.stream.Attributes;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.OutHandler;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.concurrent.duration.Cpackage;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Try;

/* compiled from: TransactionalProducerStage.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dh\u0001\u0002\u0011\"\r!B\u0001B\u0016\u0001\u0003\u0002\u0003\u0006Ia\u001a\u0005\tU\u0002\u0011\t\u0011)A\u0005W\"Aq\u000f\u0001B\u0001B\u0003%\u0001\u0010\u0003\u0005}\u0001\t\u0005\t\u0015!\u0003~\u0011\u001d\tY\u0001\u0001C\u0001\u0003\u001bA\u0011\"!\u0007\u0001\u0005\u0004%I!a\u0007\t\u0011\u00055\u0002\u0001)A\u0005\u0003;A\u0011\"a\f\u0001\u0005\u0004%I!!\r\t\u000f\u0005M\u0002\u0001)A\u0005{\"I\u0011Q\u0007\u0001A\u0002\u0013%\u0011q\u0007\u0005\n\u0003\u000f\u0002\u0001\u0019!C\u0005\u0003\u0013B\u0001\"!\u0016\u0001A\u0003&\u0011\u0011\b\u0005\n\u0003/\u0002\u0001\u0019!C\u0005\u00033B\u0011\"!\u0019\u0001\u0001\u0004%I!a\u0019\t\u0011\u0005\u001d\u0004\u0001)Q\u0005\u00037Bq!!\u001b\u0001\t\u0003\nY\u0007C\u0004\u0002n\u0001!I!a\u001c\t\u0013\u0005U\u0004!%A\u0005\n\u0005]\u0004bBAG\u0001\u0011%\u00111\u000e\u0005\b\u0003\u001f\u0003A\u0011KAI\u0011\u001d\t9\n\u0001C\u0005\u00033C\u0011\"a(\u0001#\u0003%I!a\u001e\t\u000f\u0005\u0005\u0006\u0001\"\u0011\u0002$\"9\u0011\u0011\u0016\u0001\u0005B\u0005-\u0004bBAV\u0001\u0011\u0005\u0013Q\u0016\u0005\b\u0003\u000b\u0004A\u0011BAd\u0011%\t)\u000e\u0001b\u0001\n\u0003\t9\u000e\u0003\u0005\u0002`\u0002\u0001\u000b\u0011BAm\u0011\u001d\t\t\u000f\u0001C\u0005\u0003WBq!a9\u0001\t\u0013\tY\u0007C\u0004\u0002f\u0002!I!a\u001b\u0003?Q\u0013\u0018M\\:bGRLwN\\1m!J|G-^2feN#\u0018mZ3M_\u001eL7M\u0003\u0002#G\u0005A\u0011N\u001c;fe:\fGN\u0003\u0002%K\u0005)1.\u00194lC*\ta%\u0001\u0003bW.\f7\u0001A\u000b\u0005SAj\u0004iE\u0003\u0001UM[F\rE\u0004,Y9btH\u0011)\u000e\u0003\u0005J!!L\u0011\u00033\u0011+g-Y;miB\u0013x\u000eZ;dKJ\u001cF/Y4f\u0019><\u0017n\u0019\t\u0003_Ab\u0001\u0001B\u00032\u0001\t\u0007!GA\u0001L#\t\u0019\u0014\b\u0005\u00025o5\tQGC\u00017\u0003\u0015\u00198-\u00197b\u0013\tATGA\u0004O_RD\u0017N\\4\u0011\u0005QR\u0014BA\u001e6\u0005\r\te.\u001f\t\u0003_u\"QA\u0010\u0001C\u0002I\u0012\u0011A\u0016\t\u0003_\u0001#Q!\u0011\u0001C\u0002I\u0012\u0011\u0001\u0015\t\u0006\u00076sCh\u0010\b\u0003\t.s!!\u0012&\u000f\u0005\u0019KU\"A$\u000b\u0005!;\u0013A\u0002\u001fs_>$h(C\u0001'\u0013\t!S%\u0003\u0002MG\u0005y\u0001K]8ek\u000e,'/T3tg\u0006<W-\u0003\u0002O\u001f\nAQI\u001c<fY>\u0004XM\u0003\u0002MGA)1)\u0015\u0018=\u007f%\u0011!k\u0014\u0002\b%\u0016\u001cX\u000f\u001c;t!\t!\u0016,D\u0001V\u0015\t1v+A\u0003ti\u0006<WM\u0003\u0002YK\u000511\u000f\u001e:fC6L!AW+\u0003\u0019M#\u0018mZ3M_\u001e<\u0017N\\4\u0011\u000bq\u000bg\u0006P \u000f\u0005u{fB\u0001#_\u0013\t\u00113%\u0003\u0002aC\u0005i\u0001K]8ek\u000e,'o\u0015;bO\u0016L!AY2\u0003\u001f5+7o]1hK\u000e\u000bG\u000e\u001c2bG.T!\u0001Y\u0011\u0011\u0005q+\u0017B\u00014d\u0005]\u0001&o\u001c3vG\u0016\u00148i\\7qY\u0016$\u0018n\u001c8Ti\u0006$X\rE\u0003,Q:bt(\u0003\u0002jC\tQBK]1og\u0006\u001cG/[8oC2\u0004&o\u001c3vG\u0016\u00148\u000b^1hK\u0006A\u0001O]8ek\u000e,'\u000f\u0005\u0003mk:bT\"A7\u000b\u0005)t'BA8q\u0003\u001d\u0019G.[3oiNT!\u0001J9\u000b\u0005I\u001c\u0018AB1qC\u000eDWMC\u0001u\u0003\ry'oZ\u0005\u0003m6\u0014\u0001\u0002\u0015:pIV\u001cWM]\u0001\u0014S:DWM]5uK\u0012\fE\u000f\u001e:jEV$Xm\u001d\t\u0003sjl\u0011aV\u0005\u0003w^\u0013!\"\u0011;ue&\u0014W\u000f^3t\u00039\u0019w.\\7ji&sG/\u001a:wC2\u00042A`A\u0004\u001b\u0005y(\u0002BA\u0001\u0003\u0007\t\u0001\u0002Z;sCRLwN\u001c\u0006\u0004\u0003\u000b)\u0014AC2p]\u000e,(O]3oi&\u0019\u0011\u0011B@\u0003\u001d\u0019Kg.\u001b;f\tV\u0014\u0018\r^5p]\u00061A(\u001b8jiz\"\"\"a\u0004\u0002\u0012\u0005M\u0011QCA\f!\u0015Y\u0003A\f\u001f@\u0011\u00151V\u00011\u0001h\u0011\u0015QW\u00011\u0001l\u0011\u00159X\u00011\u0001y\u0011\u0015aX\u00011\u0001~\u0003I\u0019w.\\7jiN\u001b\u0007.\u001a3vY\u0016\u00148*Z=\u0016\u0005\u0005u\u0001\u0003BA\u0010\u0003Si!!!\t\u000b\t\u0005\r\u0012QE\u0001\u0005Y\u0006twM\u0003\u0002\u0002(\u0005!!.\u0019<b\u0013\u0011\tY#!\t\u0003\rM#(/\u001b8h\u0003M\u0019w.\\7jiN\u001b\u0007.\u001a3vY\u0016\u00148*Z=!\u0003QiWm]:bO\u0016$%/Y5o\u0013:$XM\u001d<bYV\tQ0A\u000bnKN\u001c\u0018mZ3Ee\u0006Lg.\u00138uKJ4\u0018\r\u001c\u0011\u0002\u0019\t\fGo\u00195PM\u001a\u001cX\r^:\u0016\u0005\u0005e\u0002\u0003BA\u001e\u0003\u0003r1aKA\u001f\u0013\r\ty$I\u0001\u001b)J\fgn]1di&|g.\u00197Qe>$WoY3s'R\fw-Z\u0005\u0005\u0003\u0007\n)E\u0001\tUe\u0006t7/Y2uS>t')\u0019;dQ*\u0019\u0011qH\u0011\u0002!\t\fGo\u00195PM\u001a\u001cX\r^:`I\u0015\fH\u0003BA&\u0003#\u00022\u0001NA'\u0013\r\ty%\u000e\u0002\u0005+:LG\u000fC\u0005\u0002T-\t\t\u00111\u0001\u0002:\u0005\u0019\u0001\u0010J\u0019\u0002\u001b\t\fGo\u00195PM\u001a\u001cX\r^:!\u0003=!W-\\1oIN+8\u000f]3oI\u0016$WCAA.!\r!\u0014QL\u0005\u0004\u0003?*$a\u0002\"p_2,\u0017M\\\u0001\u0014I\u0016l\u0017M\u001c3TkN\u0004XM\u001c3fI~#S-\u001d\u000b\u0005\u0003\u0017\n)\u0007C\u0005\u0002T9\t\t\u00111\u0001\u0002\\\u0005\u0001B-Z7b]\u0012\u001cVo\u001d9f]\u0012,G\rI\u0001\taJ,7\u000b^1siR\u0011\u00111J\u0001\re\u0016\u001cX/\\3EK6\fg\u000e\u001a\u000b\u0005\u0003\u0017\n\t\bC\u0005\u0002tE\u0001\n\u00111\u0001\u0002\\\u0005IAO]=U_B+H\u000e\\\u0001\u0017e\u0016\u001cX/\\3EK6\fg\u000e\u001a\u0013eK\u001a\fW\u000f\u001c;%cU\u0011\u0011\u0011\u0010\u0016\u0005\u00037\nYh\u000b\u0002\u0002~A!\u0011qPAE\u001b\t\t\tI\u0003\u0003\u0002\u0004\u0006\u0015\u0015!C;oG\",7m[3e\u0015\r\t9)N\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BAF\u0003\u0003\u0013\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u00035\u0019Xo\u001d9f]\u0012$U-\\1oI\u00069qN\u001c+j[\u0016\u0014H\u0003BA&\u0003'Ca!!&\u0015\u0001\u0004I\u0014\u0001\u0003;j[\u0016\u00148*Z=\u0002-5\f\u0017PY3D_6l\u0017\u000e\u001e+sC:\u001c\u0018m\u0019;j_:$B!a\u0013\u0002\u001c\"I\u0011QT\u000b\u0011\u0002\u0003\u0007\u00111L\u0001\u0014E\u0016<\u0017N\u001c(foR\u0013\u0018M\\:bGRLwN\\\u0001![\u0006L(-Z\"p[6LG\u000f\u0016:b]N\f7\r^5p]\u0012\"WMZ1vYR$\u0013'\u0001\u0005q_N$8+\u001a8e)\u0011\tY%!*\t\r\u0005\u001dv\u00031\u0001C\u0003\ri7oZ\u0001\u0014_:\u001cu.\u001c9mKRLwN\\*vG\u000e,7o]\u0001\u0014_:\u001cu.\u001c9mKRLwN\u001c$bS2,(/\u001a\u000b\u0005\u0003\u0017\ny\u000bC\u0004\u00022f\u0001\r!a-\u0002\u0005\u0015D\b\u0003BA[\u0003\u007fsA!a.\u0002<:\u0019a)!/\n\u0003YJ1!!06\u0003\u001d\u0001\u0018mY6bO\u0016LA!!1\u0002D\nIA\u000b\u001b:po\u0006\u0014G.\u001a\u0006\u0004\u0003{+\u0014!E2p[6LG\u000f\u0016:b]N\f7\r^5p]R1\u00111JAe\u0003'Dq!a3\u001b\u0001\u0004\ti-A\u0003cCR\u001c\u0007\u000e\u0005\u0003\u0002<\u0005=\u0017\u0002BAi\u0003\u000b\u0012\u0001DT8oK6\u0004H/\u001f+sC:\u001c\u0018m\u0019;j_:\u0014\u0015\r^2i\u0011\u001d\tiJ\u0007a\u0001\u00037\nQc\u001c8J]R,'O\\1m\u0007>lW.\u001b;BG.\u001c%-\u0006\u0002\u0002ZB)A+a7\u0002L%\u0019\u0011Q\\+\u0003\u001b\u0005\u001b\u0018P\\2DC2d'-Y2l\u0003Yyg.\u00138uKJt\u0017\r\\\"p[6LG/Q2l\u0007\n\u0004\u0013\u0001E5oSR$&/\u00198tC\u000e$\u0018n\u001c8t\u0003A\u0011WmZ5o)J\fgn]1di&|g.\u0001\tbE>\u0014H\u000f\u0016:b]N\f7\r^5p]\u0002")
/* loaded from: input_file:BOOT-INF/lib/akka-stream-kafka_2.12-1.0.5.jar:akka/kafka/internal/TransactionalProducerStageLogic.class */
public final class TransactionalProducerStageLogic<K, V, P> extends DefaultProducerStageLogic<K, V, P, ProducerMessage.Envelope<K, V, P>, ProducerMessage.Results<K, V, P>> {
    public final TransactionalProducerStage<K, V, P> akka$kafka$internal$TransactionalProducerStageLogic$$stage;
    private final Producer<K, V> producer;
    private final FiniteDuration commitInterval;
    private final String commitSchedulerKey;
    private final FiniteDuration messageDrainInterval;
    private TransactionalProducerStage.TransactionBatch batchOffsets;
    private boolean demandSuspended;
    private final AsyncCallback<BoxedUnit> onInternalCommitAckCb;

    private String commitSchedulerKey() {
        return this.commitSchedulerKey;
    }

    private FiniteDuration messageDrainInterval() {
        return this.messageDrainInterval;
    }

    private TransactionalProducerStage.TransactionBatch batchOffsets() {
        return this.batchOffsets;
    }

    private void batchOffsets_$eq(TransactionalProducerStage.TransactionBatch transactionBatch) {
        this.batchOffsets = transactionBatch;
    }

    private boolean demandSuspended() {
        return this.demandSuspended;
    }

    private void demandSuspended_$eq(boolean z) {
        this.demandSuspended = z;
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void preStart() {
        initTransactions();
        beginTransaction();
        resumeDemand(false);
        scheduleOnce(commitSchedulerKey(), this.commitInterval);
    }

    private void resumeDemand(boolean z) {
        setHandler(this.akka$kafka$internal$TransactionalProducerStageLogic$$stage.out(), new OutHandler(this) { // from class: akka.kafka.internal.TransactionalProducerStageLogic$$anon$1
            private final /* synthetic */ TransactionalProducerStageLogic $outer;

            @Override // akka.stream.stage.OutHandler
            public void onDownstreamFinish() throws Exception {
                onDownstreamFinish();
            }

            @Override // akka.stream.stage.OutHandler
            public void onPull() {
                this.$outer.tryPull(this.$outer.akka$kafka$internal$TransactionalProducerStageLogic$$stage.in());
            }

            {
                if (this == 0) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
        demandSuspended_$eq(false);
        if (z && isAvailable(this.akka$kafka$internal$TransactionalProducerStageLogic$$stage.out()) && !hasBeenPulled(this.akka$kafka$internal$TransactionalProducerStageLogic$$stage.in())) {
            tryPull(this.akka$kafka$internal$TransactionalProducerStageLogic$$stage.in());
        }
    }

    private boolean resumeDemand$default$1() {
        return true;
    }

    private void suspendDemand() {
        if (!demandSuspended()) {
            final TransactionalProducerStageLogic transactionalProducerStageLogic = null;
            setHandler(this.akka$kafka$internal$TransactionalProducerStageLogic$$stage.out(), new OutHandler(transactionalProducerStageLogic) { // from class: akka.kafka.internal.TransactionalProducerStageLogic$$anon$2
                @Override // akka.stream.stage.OutHandler
                public void onDownstreamFinish() throws Exception {
                    onDownstreamFinish();
                }

                @Override // akka.stream.stage.OutHandler
                public void onPull() {
                }

                {
                    OutHandler.$init$(this);
                }
            });
        }
        demandSuspended_$eq(true);
    }

    @Override // akka.stream.stage.TimerGraphStageLogic
    public void onTimer(Object obj) {
        String commitSchedulerKey = commitSchedulerKey();
        if (obj == null) {
            if (commitSchedulerKey != null) {
                return;
            }
        } else if (!obj.equals(commitSchedulerKey)) {
            return;
        }
        maybeCommitTransaction(maybeCommitTransaction$default$1());
    }

    private void maybeCommitTransaction(boolean z) {
        int i = awaitingConfirmation().get();
        TransactionalProducerStage.TransactionBatch batchOffsets = batchOffsets();
        if (batchOffsets instanceof TransactionalProducerStage.NonemptyTransactionBatch) {
            TransactionalProducerStage.NonemptyTransactionBatch nonemptyTransactionBatch = (TransactionalProducerStage.NonemptyTransactionBatch) batchOffsets;
            if (i == 0) {
                commitTransaction(nonemptyTransactionBatch, z);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (i <= 0) {
            scheduleOnce(commitSchedulerKey(), this.commitInterval);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            suspendDemand();
            scheduleOnce(commitSchedulerKey(), messageDrainInterval());
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    private boolean maybeCommitTransaction$default$1() {
        return true;
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic
    public void postSend(ProducerMessage.Envelope<K, V, P> envelope) {
        P passThrough = envelope.passThrough();
        if (!(passThrough instanceof ConsumerMessage.PartitionOffsetCommittedMarker)) {
            throw new MatchError(passThrough);
        }
        batchOffsets_$eq(batchOffsets().updated((ConsumerMessage.PartitionOffsetCommittedMarker) passThrough));
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic, akka.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionSuccess() {
        log().debug("Committing final transaction before shutdown");
        cancelTimer(commitSchedulerKey());
        maybeCommitTransaction(false);
        super.onCompletionSuccess();
    }

    @Override // akka.kafka.internal.DefaultProducerStageLogic, akka.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionFailure(Throwable th) {
        log().debug("Aborting transaction due to stage failure");
        abortTransaction();
        batchOffsets().committingFailed();
        super.onCompletionFailure(th);
    }

    private void commitTransaction(TransactionalProducerStage.NonemptyTransactionBatch nonemptyTransactionBatch, boolean z) {
        String group = nonemptyTransactionBatch.group();
        log().debug("Committing transaction for consumer group '{}' with offsets: {}", group, nonemptyTransactionBatch.offsets());
        this.producer.sendOffsetsToTransaction((Map<TopicPartition, OffsetAndMetadata>) CollectionConverters$.MODULE$.mapAsJavaMapConverter(nonemptyTransactionBatch.offsetMap()).asJava(), group);
        this.producer.commitTransaction();
        log().debug("Committed transaction for consumer group '{}' with offsets: {}", group, nonemptyTransactionBatch.offsets());
        batchOffsets_$eq(TransactionalProducerStage$TransactionBatch$.MODULE$.empty());
        nonemptyTransactionBatch.internalCommit().onComplete(r4 -> {
            $anonfun$commitTransaction$1(this, r4);
            return BoxedUnit.UNIT;
        }, materializer().executionContext());
        if (z) {
            beginTransaction();
            resumeDemand(resumeDemand$default$1());
        }
    }

    public AsyncCallback<BoxedUnit> onInternalCommitAckCb() {
        return this.onInternalCommitAckCb;
    }

    private void initTransactions() {
        log().debug("Initializing transactions");
        this.producer.initTransactions();
    }

    private void beginTransaction() {
        log().debug("Beginning new transaction");
        this.producer.beginTransaction();
    }

    private void abortTransaction() {
        log().debug("Aborting transaction");
        this.producer.abortTransaction();
    }

    public static final /* synthetic */ void $anonfun$commitTransaction$1(TransactionalProducerStageLogic transactionalProducerStageLogic, Try r4) {
        transactionalProducerStageLogic.onInternalCommitAckCb().invoke(BoxedUnit.UNIT);
    }

    public static final /* synthetic */ void $anonfun$onInternalCommitAckCb$1(TransactionalProducerStageLogic transactionalProducerStageLogic, BoxedUnit boxedUnit) {
        transactionalProducerStageLogic.scheduleOnce(transactionalProducerStageLogic.commitSchedulerKey(), transactionalProducerStageLogic.commitInterval);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public TransactionalProducerStageLogic(TransactionalProducerStage<K, V, P> transactionalProducerStage, Producer<K, V> producer, Attributes attributes, FiniteDuration finiteDuration) {
        super(transactionalProducerStage, producer, attributes);
        this.akka$kafka$internal$TransactionalProducerStageLogic$$stage = transactionalProducerStage;
        this.producer = producer;
        this.commitInterval = finiteDuration;
        this.commitSchedulerKey = "commit";
        this.messageDrainInterval = new Cpackage.DurationInt(package$.MODULE$.DurationInt(10)).milliseconds();
        this.batchOffsets = TransactionalProducerStage$TransactionBatch$.MODULE$.empty();
        this.demandSuspended = false;
        this.onInternalCommitAckCb = getAsyncCallback(boxedUnit -> {
            $anonfun$onInternalCommitAckCb$1(this, boxedUnit);
            return BoxedUnit.UNIT;
        });
    }
}
