package org.apache.pekko.kafka.internal;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.kafka.CommitterSettings;
import org.apache.pekko.kafka.ConsumerMessage;
import org.apache.pekko.kafka.ConsumerMessage$CommittableOffsetBatch$;
import org.apache.pekko.kafka.ProducerMessage;
import org.apache.pekko.kafka.ProducerMessage.Envelope;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.pekko.kafka.internal.CommitTrigger;
import org.apache.pekko.kafka.internal.DeferredProducer;
import org.apache.pekko.stream.ActorAttributes;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Supervision;
import org.apache.pekko.stream.Supervision$;
import org.apache.pekko.stream.Supervision$Stop$;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.collection.StringOps$;
import scala.collection.immutable.Map;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyVals;
import scala.runtime.LazyVals$;
import scala.runtime.LazyVals$Evaluating$;
import scala.runtime.LazyVals$NullValue$;
import scala.runtime.Statics;
import scala.util.Failure;
import scala.util.Failure$;
import scala.util.Success;
import scala.util.Success$;
import scala.util.Try;

/* compiled from: CommittingProducerSinkStage.scala */
/* loaded from: input_file:org/apache/pekko/kafka/internal/CommittingProducerSinkStageLogic.class */
public final class CommittingProducerSinkStageLogic<K, V, IN extends ProducerMessage.Envelope<K, V, ConsumerMessage.Committable>> extends TimerGraphStageLogic implements CommitObservationLogic, StageIdLogging, DeferredProducer<K, V>, ExecutionContextProvider, DeferredProducer, ExecutionContextProvider {
    public static final long OFFSET$0 = LazyVals$.MODULE$.getOffsetStatic(CommittingProducerSinkStageLogic.class.getDeclaredField("decider$lzy1"));
    private ConsumerMessage.CommittableOffsetBatch offsetBatch;
    private Map org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private String org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    private LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    private Producer producer;
    private DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle;
    public final CommittingProducerSinkStage<K, V, IN> org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage;
    private Attributes inheritedAttributes;
    private final Promise streamCompletion;
    private final CommitterSettings settings;
    private volatile Object decider$lzy1;
    private final ProducerSettings producerSettings;
    private final AsyncCallback closeAndFailStageCb;
    private long awaitingProduceResult;
    private long awaitingCommitResult;
    public final AsyncCallback<Tuple2<Object, Throwable>> org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$sendFailureCb;
    public final AsyncCallback<ConsumerMessage.Committable> org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$collectOffsetCb;
    public final AsyncCallback<Tuple2<Object, ConsumerMessage.Committable>> org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$collectOffsetMultiCb;
    private final AsyncCallback<Tuple2<Object, Try<Done>>> commitResultCB;
    public Option<Try<Done>> org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$upstreamCompletionState;

    /* compiled from: CommittingProducerSinkStage.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/CommittingProducerSinkStageLogic$SendCallback.class */
    public final class SendCallback implements Callback {
        private final ConsumerMessage.Committable offset;
        private final /* synthetic */ CommittingProducerSinkStageLogic $outer;

        public SendCallback(CommittingProducerSinkStageLogic committingProducerSinkStageLogic, ConsumerMessage.Committable committable) {
            this.offset = committable;
            if (committingProducerSinkStageLogic == null) {
                throw new NullPointerException();
            }
            this.$outer = committingProducerSinkStageLogic;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc == null) {
                this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$collectOffsetCb.invoke(this.offset);
                return;
            }
            this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$sendFailureCb.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((Integer) Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(1)), exc));
        }

        public final /* synthetic */ CommittingProducerSinkStageLogic org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$SendCallback$$$outer() {
            return this.$outer;
        }
    }

    /* compiled from: CommittingProducerSinkStage.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/CommittingProducerSinkStageLogic$SendMultiCallback.class */
    public final class SendMultiCallback implements Callback {
        private final int count;
        private final ConsumerMessage.Committable offset;
        private final AtomicInteger counter;
        private final /* synthetic */ CommittingProducerSinkStageLogic $outer;

        public SendMultiCallback(CommittingProducerSinkStageLogic committingProducerSinkStageLogic, int i, ConsumerMessage.Committable committable) {
            this.count = i;
            this.offset = committable;
            if (committingProducerSinkStageLogic == null) {
                throw new NullPointerException();
            }
            this.$outer = committingProducerSinkStageLogic;
            this.counter = new AtomicInteger(i);
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$sendFailureCb.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((Integer) Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(this.count)), exc));
            } else if (this.counter.decrementAndGet() == 0) {
                this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$collectOffsetMultiCb.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((Integer) Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(this.count)), this.offset));
            }
        }

        public final /* synthetic */ CommittingProducerSinkStageLogic org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$SendMultiCallback$$$outer() {
            return this.$outer;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public CommittingProducerSinkStageLogic(CommittingProducerSinkStage<K, V, IN> committingProducerSinkStage, Attributes attributes) {
        super(committingProducerSinkStage.m116shape());
        this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage = committingProducerSinkStage;
        this.inheritedAttributes = attributes;
        CommitObservationLogic.$init$(this);
        StageLogging.$init$(this);
        org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(StringOps$.MODULE$.take$extension(Predef$.MODULE$.augmentString(UUID.randomUUID().toString()), 5));
        StageIdLogging.$init$((StageIdLogging) this);
        producerAssignmentLifecycle_$eq(DeferredProducer$Unassigned$.MODULE$);
        this.streamCompletion = Promise$.MODULE$.apply();
        this.settings = committingProducerSinkStage.committerSettings();
        this.producerSettings = committingProducerSinkStage.producerSettings();
        this.closeAndFailStageCb = getAsyncCallback(th -> {
            org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$closeAndFailStage(th);
        });
        this.awaitingProduceResult = 0L;
        this.awaitingCommitResult = 0L;
        this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$sendFailureCb = getAsyncCallback(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            int unboxToInt = BoxesRunTime.unboxToInt(tuple2._1());
            Throwable th2 = (Throwable) tuple2._2();
            if (Supervision$Stop$.MODULE$.equals((Supervision.Directive) decider().apply(th2))) {
                org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$closeAndFailStage(th2);
            } else {
                collectOffsetIgnore(unboxToInt, th2);
            }
        });
        this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$collectOffsetCb = getAsyncCallback(committable -> {
            this.awaitingProduceResult--;
            collectOffset(committable);
        });
        this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$collectOffsetMultiCb = getAsyncCallback(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            int unboxToInt = BoxesRunTime.unboxToInt(tuple22._1());
            ConsumerMessage.Committable committable2 = (ConsumerMessage.Committable) tuple22._2();
            this.awaitingProduceResult -= unboxToInt;
            collectOffset(committable2);
        });
        this.commitResultCB = getAsyncCallback(tuple23 -> {
            if (tuple23 != null) {
                long unboxToLong = BoxesRunTime.unboxToLong(tuple23._1());
                Failure failure = (Try) tuple23._2();
                if (failure instanceof Success) {
                    this.awaitingCommitResult -= unboxToLong;
                    checkForCompletion();
                    return;
                } else if (failure instanceof Failure) {
                    Throwable exception = failure.exception();
                    this.awaitingCommitResult -= unboxToLong;
                    if (Supervision$Stop$.MODULE$.equals((Supervision.Directive) decider().apply(exception))) {
                        log().error("committing failed with {}", exception);
                        org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$closeAndFailStage(exception);
                    } else {
                        log().warning("ignored commit failure {}", exception);
                    }
                    checkForCompletion();
                    return;
                }
            }
            throw new MatchError(tuple23);
        });
        this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$upstreamCompletionState = None$.MODULE$;
        setHandler(committingProducerSinkStage.in(), new InHandler(this) { // from class: org.apache.pekko.kafka.internal.CommittingProducerSinkStageLogic$$anon$1
            private final /* synthetic */ CommittingProducerSinkStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public void onPush() {
                this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$produce((ProducerMessage.Envelope) this.$outer.protected$grab(this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage.in()));
                this.$outer.protected$tryPull(this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage.in());
            }

            public void onUpstreamFinish() {
                if (this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$awaitingCommitsBeforeShutdown()) {
                    this.$outer.completeStage();
                    this.$outer.streamCompletion().success(Done$.MODULE$);
                } else {
                    this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$commit(CommitTrigger$UpstreamFinish$.MODULE$);
                    this.$outer.protected$setKeepGoing(true);
                    this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$upstreamCompletionState = Some$.MODULE$.apply(Success$.MODULE$.apply(Done$.MODULE$));
                }
            }

            public void onUpstreamFailure(Throwable th2) {
                if (this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$awaitingCommitsBeforeShutdown()) {
                    this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$closeAndFailStage(th2);
                } else {
                    this.$outer.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$emergencyShutdown(th2);
                }
            }
        });
        Statics.releaseFence();
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public ConsumerMessage.CommittableOffsetBatch offsetBatch() {
        return this.offsetBatch;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public Map org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets() {
        return this.org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public void offsetBatch_$eq(ConsumerMessage.CommittableOffsetBatch committableOffsetBatch) {
        this.offsetBatch = committableOffsetBatch;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public void org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets_$eq(Map map) {
        this.org$apache$pekko$kafka$internal$CommitObservationLogic$$deferredOffsets = map;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public /* bridge */ /* synthetic */ boolean updateBatch(ConsumerMessage.Committable committable) {
        boolean updateBatch;
        updateBatch = updateBatch(committable);
        return updateBatch;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public /* bridge */ /* synthetic */ int clearDeferredOffsets() {
        int clearDeferredOffsets;
        clearDeferredOffsets = clearDeferredOffsets();
        return clearDeferredOffsets;
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public String org$apache$pekko$kafka$internal$InstanceId$$instanceId() {
        return this.org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public void org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(String str) {
        this.org$apache$pekko$kafka$internal$InstanceId$$instanceId = str;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public /* bridge */ /* synthetic */ String id() {
        String id;
        id = id();
        return id;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log() {
        return this.org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public void org$apache$pekko$kafka$internal$StageIdLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$kafka$internal$StageIdLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ String idLogPrefix() {
        String idLogPrefix;
        idLogPrefix = idLogPrefix();
        return idLogPrefix;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public Producer producer() {
        return this.producer;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle() {
        return this.producerAssignmentLifecycle;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public void producer_$eq(Producer producer) {
        this.producer = producer;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public void producerAssignmentLifecycle_$eq(DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle) {
        this.producerAssignmentLifecycle = producerAssignmentLifecycle;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public /* bridge */ /* synthetic */ void resolveProducer(ProducerSettings producerSettings) {
        resolveProducer(producerSettings);
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public /* bridge */ /* synthetic */ void closeProducerImmediately() {
        closeProducerImmediately();
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public /* bridge */ /* synthetic */ void closeProducer() {
        closeProducer();
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* synthetic */ LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$super$log() {
        return StageLogging.log$(this);
    }

    public final Promise<Done> streamCompletion() {
        return this.streamCompletion;
    }

    @Override // org.apache.pekko.kafka.internal.CommitObservationLogic
    public final CommitterSettings settings() {
        return this.settings;
    }

    private Function1<Throwable, Supervision.Directive> decider() {
        Object obj = this.decider$lzy1;
        if (obj instanceof Function1) {
            return (Function1) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (Function1) decider$lzyINIT1();
    }

    private Object decider$lzyINIT1() {
        while (true) {
            Object obj = this.decider$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$0, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        LazyVals$NullValue$ lazyVals$NullValue$2 = (Function1) this.inheritedAttributes.get(ClassTag$.MODULE$.apply(ActorAttributes.SupervisionStrategy.class)).map(supervisionStrategy -> {
                            return supervisionStrategy.decider();
                        }).getOrElse(CommittingProducerSinkStageLogic::decider$lzyINIT1$$anonfun$2);
                        lazyVals$NullValue$ = lazyVals$NullValue$2 == null ? LazyVals$NullValue$.MODULE$ : lazyVals$NullValue$2;
                        this.inheritedAttributes = null;
                        return lazyVals$NullValue$2;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$0, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.decider$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$0, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$0, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    @Override // org.apache.pekko.kafka.internal.ExecutionContextProvider
    public ExecutionContext getExecutionContext() {
        return materializer().executionContext();
    }

    public Class<?> logSource() {
        return CommittingProducerSinkStage.class;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public ProducerSettings<K, V> producerSettings() {
        return this.producerSettings;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public AsyncCallback<Throwable> closeAndFailStageCb() {
        return this.closeAndFailStageCb;
    }

    public void org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$closeAndFailStage(Throwable th) {
        closeProducerImmediately();
        failStage(th);
        streamCompletion().failure(th);
    }

    public void preStart() {
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.preStart();
        resolveProducer(this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage.producerSettings());
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public void producerAssigned() {
        tryPull(this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage.in());
        scheduleCommit();
        log().debug("CommittingProducerSink initialized");
    }

    public void org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$produce(ProducerMessage.Envelope<K, V, ConsumerMessage.Committable> envelope) {
        if (envelope instanceof ProducerMessage.Message) {
            ProducerMessage.Message message = (ProducerMessage.Message) envelope;
            this.awaitingProduceResult++;
            this.awaitingCommitResult++;
            producer().send(message.record(), new SendCallback(this, (ConsumerMessage.Committable) message.passThrough()));
            return;
        }
        if (!(envelope instanceof ProducerMessage.MultiMessage)) {
            if (!(envelope instanceof ProducerMessage.PassThroughMessage)) {
                throw new MatchError(envelope);
            }
            this.awaitingCommitResult++;
            collectOffset((ConsumerMessage.Committable) ((ProducerMessage.PassThroughMessage) envelope).passThrough());
            return;
        }
        ProducerMessage.MultiMessage multiMessage = (ProducerMessage.MultiMessage) envelope;
        if (multiMessage.records().isEmpty()) {
            this.awaitingCommitResult++;
            collectOffset((ConsumerMessage.Committable) multiMessage.passThrough());
            return;
        }
        int size = multiMessage.records().size();
        this.awaitingProduceResult += size;
        this.awaitingCommitResult++;
        SendMultiCallback sendMultiCallback = new SendMultiCallback(this, size, (ConsumerMessage.Committable) multiMessage.passThrough());
        multiMessage.records().foreach(producerRecord -> {
            return producer().send(producerRecord, sendMultiCallback);
        });
    }

    private void collectOffsetIgnore(int i, Throwable th) {
        log().warning("ignoring send failure {}", th);
        this.awaitingCommitResult--;
        this.awaitingProduceResult -= i;
    }

    private void scheduleCommit() {
        scheduleOnce(CommittingProducerSinkStage$.MODULE$.CommitNow(), this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage.committerSettings().maxInterval());
    }

    public void onTimer(Object obj) {
        String CommitNow = CommittingProducerSinkStage$.MODULE$.CommitNow();
        if (CommitNow != null ? !CommitNow.equals(obj) : obj != null) {
            log().warning("unexpected timer [{}]", obj);
        } else {
            org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$commit(CommitTrigger$Interval$.MODULE$);
        }
    }

    private void collectOffset(ConsumerMessage.Committable committable) {
        if (updateBatch(committable)) {
            org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$commit(CommitTrigger$BatchSize$.MODULE$);
        } else if (isClosed(this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage.in()) && this.awaitingProduceResult == 0) {
            org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$commit(CommitTrigger$UpstreamClosed$.MODULE$);
        }
    }

    public void org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$commit(CommitTrigger.TriggerdBy triggerdBy) {
        if (offsetBatch().batchSize() != 0) {
            log().debug("commit triggered by {} (awaitingProduceResult={} awaitingCommitResult={})", triggerdBy, BoxesRunTime.boxToLong(this.awaitingProduceResult), BoxesRunTime.boxToLong(this.awaitingCommitResult));
            long batchSize = offsetBatch().batchSize();
            offsetBatch().commitInternal().onComplete(r8 -> {
                this.commitResultCB.invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((Long) Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(batchSize)), r8));
            }, materializer().executionContext());
            offsetBatch_$eq(ConsumerMessage$CommittableOffsetBatch$.MODULE$.empty());
        }
        scheduleCommit();
    }

    public void org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$emergencyShutdown(Throwable th) {
        log().debug("Emergency shutdown triggered by {} (awaitingProduceResult={} awaitingCommitResult={})", th, BoxesRunTime.boxToLong(this.awaitingProduceResult), BoxesRunTime.boxToLong(this.awaitingCommitResult));
        offsetBatch().tellCommitEmergency();
        this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$upstreamCompletionState = Some$.MODULE$.apply(Failure$.MODULE$.apply(th));
        offsetBatch_$eq(ConsumerMessage$CommittableOffsetBatch$.MODULE$.empty());
        org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$closeAndFailStage(th);
    }

    public boolean org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$awaitingCommitsBeforeShutdown() {
        this.awaitingCommitResult -= clearDeferredOffsets();
        return this.awaitingCommitResult == 0;
    }

    private void checkForCompletion() {
        if (isClosed(this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$stage.in())) {
            if (!org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$awaitingCommitsBeforeShutdown()) {
                log().debug("checkForCompletion awaitingProduceResult={} awaitingCommitResult={}", BoxesRunTime.boxToLong(this.awaitingProduceResult), BoxesRunTime.boxToLong(this.awaitingCommitResult));
                return;
            }
            Some some = this.org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$upstreamCompletionState;
            if (some instanceof Some) {
                Failure failure = (Try) some.value();
                if (failure instanceof Success) {
                    completeStage();
                    streamCompletion().success(Done$.MODULE$);
                    return;
                } else if (failure instanceof Failure) {
                    org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$closeAndFailStage(failure.exception());
                    return;
                }
            }
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            org$apache$pekko$kafka$internal$CommittingProducerSinkStageLogic$$closeAndFailStage(new IllegalStateException("Stage completed, but there is no info about status"));
        }
    }

    public void postStop() {
        log().debug("CommittingProducerSink stopped");
        closeProducer();
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.postStop();
    }

    public <T> T protected$grab(Inlet<T> inlet) {
        return (T) grab(inlet);
    }

    public <T> void protected$tryPull(Inlet<T> inlet) {
        tryPull(inlet);
    }

    public void protected$setKeepGoing(boolean z) {
        setKeepGoing(z);
    }

    private static final Function1 decider$lzyINIT1$$anonfun$2() {
        return Supervision$.MODULE$.stoppingDecider();
    }
}
