package org.apache.pekko.kafka.internal;

import java.util.UUID;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.kafka.ProducerMessage;
import org.apache.pekko.kafka.ProducerMessage$MultiResult$;
import org.apache.pekko.kafka.ProducerMessage$MultiResultPart$;
import org.apache.pekko.kafka.ProducerMessage$PassThroughResult$;
import org.apache.pekko.kafka.ProducerMessage$Result$;
import org.apache.pekko.kafka.ProducerMessage.Envelope;
import org.apache.pekko.kafka.ProducerMessage.Results;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.pekko.kafka.internal.DeferredProducer;
import org.apache.pekko.kafka.internal.ProducerStage;
import org.apache.pekko.stream.ActorAttributes;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Supervision;
import org.apache.pekko.stream.Supervision$;
import org.apache.pekko.stream.Supervision$Stop$;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.collection.BuildFrom$;
import scala.collection.StringOps$;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.LazyVals;
import scala.runtime.LazyVals$;
import scala.runtime.LazyVals$Evaluating$;
import scala.runtime.LazyVals$NullValue$;
import scala.runtime.Statics;
import scala.util.Failure;
import scala.util.Failure$;
import scala.util.Success;
import scala.util.Success$;
import scala.util.Try;

/* compiled from: DefaultProducerStage.scala */
/* loaded from: input_file:org/apache/pekko/kafka/internal/DefaultProducerStageLogic.class */
public class DefaultProducerStageLogic<K, V, P, IN extends ProducerMessage.Envelope<K, V, P>, OUT extends ProducerMessage.Results<K, V, P>> extends TimerGraphStageLogic implements StageIdLogging, DeferredProducer<K, V>, ProducerStage.ProducerCompletionState, ExecutionContextProvider, ProducerStage.ProducerCompletionState, ExecutionContextProvider {
    public static final long OFFSET$0 = LazyVals$.MODULE$.getOffsetStatic(DefaultProducerStageLogic.class.getDeclaredField("decider$lzy1"));
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private String org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    private LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    private Producer producer;
    private DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle;
    public final ProducerStage<K, V, P, IN, OUT> org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage;
    private Attributes inheritedAttributes;
    private volatile Object decider$lzy1;
    private int awaitingConfirmation;
    public Option<Try<Done>> org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$completionState;
    private final ProducerSettings producerSettings;
    public final AsyncCallback<BoxedUnit> org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$confirmAndCheckForCompletionCB;
    private final AsyncCallback closeAndFailStageCb;

    /* compiled from: DefaultProducerStage.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/DefaultProducerStageLogic$CallbackBase.class */
    public abstract class CallbackBase implements Callback {
        private final Promise<?> promise;
        private final /* synthetic */ DefaultProducerStageLogic $outer;

        public CallbackBase(DefaultProducerStageLogic defaultProducerStageLogic, Promise<?> promise) {
            this.promise = promise;
            if (defaultProducerStageLogic == null) {
                throw new NullPointerException();
            }
            this.$outer = defaultProducerStageLogic;
        }

        public abstract void emitElement(RecordMetadata recordMetadata);

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc == null) {
                emitElement(recordMetadata);
                this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$confirmAndCheckForCompletionCB.invoke(BoxedUnit.UNIT);
                return;
            }
            if (Supervision$Stop$.MODULE$.equals((Supervision.Directive) this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$decider().apply(exc))) {
                this.$outer.closeAndFailStageCb().invoke(exc);
            } else {
                this.promise.failure(exc);
                this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$confirmAndCheckForCompletionCB.invoke(BoxedUnit.UNIT);
            }
        }

        public final /* synthetic */ DefaultProducerStageLogic org$apache$pekko$kafka$internal$DefaultProducerStageLogic$CallbackBase$$$outer() {
            return this.$outer;
        }
    }

    /* compiled from: DefaultProducerStage.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/DefaultProducerStageLogic$DefaultInHandler.class */
    public class DefaultInHandler implements InHandler {
        private final /* synthetic */ DefaultProducerStageLogic $outer;

        public DefaultInHandler(DefaultProducerStageLogic defaultProducerStageLogic) {
            if (defaultProducerStageLogic == null) {
                throw new NullPointerException();
            }
            this.$outer = defaultProducerStageLogic;
        }

        public void onPush() {
            this.$outer.produce((ProducerMessage.Envelope) this.$outer.protected$grab(this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.in()));
        }

        public void onUpstreamFinish() {
            this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$completionState = Some$.MODULE$.apply(Success$.MODULE$.apply(Done$.MODULE$));
            this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$checkForCompletion();
        }

        public void onUpstreamFailure(Throwable th) {
            this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$completionState = Some$.MODULE$.apply(Failure$.MODULE$.apply(th));
            this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$checkForCompletion();
        }

        public final /* synthetic */ DefaultProducerStageLogic org$apache$pekko$kafka$internal$DefaultProducerStageLogic$DefaultInHandler$$$outer() {
            return this.$outer;
        }
    }

    /* compiled from: DefaultProducerStage.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/DefaultProducerStageLogic$SendCallback.class */
    public final class SendCallback extends CallbackBase {
        private final ProducerMessage.Message<K, V, P> msg;
        private final Promise<ProducerMessage.Result<K, V, P>> promise;
        private final /* synthetic */ DefaultProducerStageLogic $outer;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public SendCallback(DefaultProducerStageLogic defaultProducerStageLogic, ProducerMessage.Message<K, V, P> message, Promise<ProducerMessage.Result<K, V, P>> promise) {
            super(defaultProducerStageLogic, promise);
            this.msg = message;
            this.promise = promise;
            if (defaultProducerStageLogic == null) {
                throw new NullPointerException();
            }
            this.$outer = defaultProducerStageLogic;
        }

        @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic.CallbackBase
        public void emitElement(RecordMetadata recordMetadata) {
            this.promise.success(ProducerMessage$Result$.MODULE$.apply(recordMetadata, this.msg));
        }

        public final /* synthetic */ DefaultProducerStageLogic org$apache$pekko$kafka$internal$DefaultProducerStageLogic$SendCallback$$$outer() {
            return this.$outer;
        }
    }

    /* compiled from: DefaultProducerStage.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/DefaultProducerStageLogic$SendMultiCallback.class */
    public final class SendMultiCallback extends CallbackBase {
        private final ProducerRecord<K, V> msg;
        private final Promise<ProducerMessage.MultiResultPart<K, V>> promise;
        private final /* synthetic */ DefaultProducerStageLogic $outer;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public SendMultiCallback(DefaultProducerStageLogic defaultProducerStageLogic, ProducerRecord<K, V> producerRecord, Promise<ProducerMessage.MultiResultPart<K, V>> promise) {
            super(defaultProducerStageLogic, promise);
            this.msg = producerRecord;
            this.promise = promise;
            if (defaultProducerStageLogic == null) {
                throw new NullPointerException();
            }
            this.$outer = defaultProducerStageLogic;
        }

        @Override // org.apache.pekko.kafka.internal.DefaultProducerStageLogic.CallbackBase
        public void emitElement(RecordMetadata recordMetadata) {
            this.promise.success(ProducerMessage$MultiResultPart$.MODULE$.apply(recordMetadata, this.msg));
        }

        public final /* synthetic */ DefaultProducerStageLogic org$apache$pekko$kafka$internal$DefaultProducerStageLogic$SendMultiCallback$$$outer() {
            return this.$outer;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public DefaultProducerStageLogic(ProducerStage<K, V, P, IN, OUT> producerStage, Attributes attributes) {
        super(producerStage.m215shape());
        this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage = producerStage;
        this.inheritedAttributes = attributes;
        StageLogging.$init$(this);
        org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(StringOps$.MODULE$.take$extension(Predef$.MODULE$.augmentString(UUID.randomUUID().toString()), 5));
        StageIdLogging.$init$((StageIdLogging) this);
        producerAssignmentLifecycle_$eq(DeferredProducer$Unassigned$.MODULE$);
        this.awaitingConfirmation = 0;
        this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$completionState = None$.MODULE$;
        this.producerSettings = producerStage.settings();
        this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$confirmAndCheckForCompletionCB = getAsyncCallback(boxedUnit -> {
            this.awaitingConfirmation--;
            org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$checkForCompletion();
        });
        this.closeAndFailStageCb = getAsyncCallback(th -> {
            closeProducerImmediately();
            failStage(th);
        });
        suspendDemandOutHandler();
        initialInHandler();
        Statics.releaseFence();
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public String org$apache$pekko$kafka$internal$InstanceId$$instanceId() {
        return this.org$apache$pekko$kafka$internal$InstanceId$$instanceId;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public void org$apache$pekko$kafka$internal$InstanceId$_setter_$org$apache$pekko$kafka$internal$InstanceId$$instanceId_$eq(String str) {
        this.org$apache$pekko$kafka$internal$InstanceId$$instanceId = str;
    }

    @Override // org.apache.pekko.kafka.internal.InstanceId
    public /* bridge */ /* synthetic */ String id() {
        String id;
        id = id();
        return id;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$_log() {
        return this.org$apache$pekko$kafka$internal$StageIdLogging$$_log;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public void org$apache$pekko$kafka$internal$StageIdLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$kafka$internal$StageIdLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ String idLogPrefix() {
        String idLogPrefix;
        idLogPrefix = idLogPrefix();
        return idLogPrefix;
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public Producer producer() {
        return this.producer;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle() {
        return this.producerAssignmentLifecycle;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public void producer_$eq(Producer producer) {
        this.producer = producer;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public void producerAssignmentLifecycle_$eq(DeferredProducer.ProducerAssignmentLifecycle producerAssignmentLifecycle) {
        this.producerAssignmentLifecycle = producerAssignmentLifecycle;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public /* bridge */ /* synthetic */ void resolveProducer(ProducerSettings producerSettings) {
        resolveProducer(producerSettings);
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public /* bridge */ /* synthetic */ void closeProducerImmediately() {
        closeProducerImmediately();
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public /* bridge */ /* synthetic */ void closeProducer() {
        closeProducer();
    }

    @Override // org.apache.pekko.kafka.internal.StageIdLogging
    public /* synthetic */ LoggingAdapter org$apache$pekko$kafka$internal$StageIdLogging$$super$log() {
        return StageLogging.log$(this);
    }

    public Function1<Throwable, Supervision.Directive> org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$decider() {
        Object obj = this.decider$lzy1;
        if (obj instanceof Function1) {
            return (Function1) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (Function1) decider$lzyINIT1();
    }

    private Object decider$lzyINIT1() {
        while (true) {
            Object obj = this.decider$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$0, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        LazyVals$NullValue$ lazyVals$NullValue$2 = (Function1) this.inheritedAttributes.get(ClassTag$.MODULE$.apply(ActorAttributes.SupervisionStrategy.class)).map(supervisionStrategy -> {
                            return supervisionStrategy.decider();
                        }).getOrElse(DefaultProducerStageLogic::decider$lzyINIT1$$anonfun$2);
                        lazyVals$NullValue$ = lazyVals$NullValue$2 == null ? LazyVals$NullValue$.MODULE$ : lazyVals$NullValue$2;
                        this.inheritedAttributes = null;
                        return lazyVals$NullValue$2;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$0, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.decider$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$0, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$0, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    @Override // org.apache.pekko.kafka.internal.ExecutionContextProvider
    public ExecutionContext getExecutionContext() {
        return materializer().executionContext();
    }

    public Class<?> logSource() {
        return DefaultProducerStage.class;
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public final ProducerSettings<K, V> producerSettings() {
        return this.producerSettings;
    }

    public int awaitingConfirmationValue() {
        return this.awaitingConfirmation;
    }

    public void preStart() {
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.preStart();
        resolveProducer(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.settings());
    }

    public void org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$checkForCompletion() {
        if (isClosed(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.in()) && this.awaitingConfirmation == 0) {
            Some some = this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$completionState;
            if (some instanceof Some) {
                Failure failure = (Try) some.value();
                if (failure instanceof Success) {
                    onCompletionSuccess();
                    return;
                } else if (failure instanceof Failure) {
                    onCompletionFailure(failure.exception());
                    return;
                }
            }
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            failStage(new IllegalStateException("Stage completed, but there is no info about status"));
        }
    }

    @Override // org.apache.pekko.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionSuccess() {
        completeStage();
    }

    @Override // org.apache.pekko.kafka.internal.ProducerStage.ProducerCompletionState
    public void onCompletionFailure(Throwable th) {
        failStage(th);
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public AsyncCallback<Throwable> closeAndFailStageCb() {
        return this.closeAndFailStageCb;
    }

    public void postSend(ProducerMessage.Envelope<K, V, P> envelope) {
    }

    @Override // org.apache.pekko.kafka.internal.DeferredProducer
    public void producerAssigned() {
        resumeDemand(resumeDemand$default$1());
    }

    public void resumeDemand(boolean z) {
        log().debug("Resume demand");
        setHandler(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.out(), new OutHandler(this) { // from class: org.apache.pekko.kafka.internal.DefaultProducerStageLogic$$anon$1
            private final /* synthetic */ DefaultProducerStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                this.$outer.protected$tryPull(this.$outer.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.in());
            }
        });
        if (z && isAvailable(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.out()) && !hasBeenPulled(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.in())) {
            tryPull(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.in());
        }
    }

    public boolean resumeDemand$default$1() {
        return true;
    }

    public void suspendDemand() {
        log().debug("Suspend demand");
        suspendDemandOutHandler();
    }

    private void suspendDemandOutHandler() {
        setHandler(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.out(), new OutHandler() { // from class: org.apache.pekko.kafka.internal.DefaultProducerStageLogic$$anon$2
            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
            }
        });
    }

    public void initialInHandler() {
        producingInHandler();
    }

    public void producingInHandler() {
        setHandler(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.in(), new DefaultInHandler(this));
    }

    public void produce(ProducerMessage.Envelope<K, V, P> envelope) {
        if (envelope instanceof ProducerMessage.Message) {
            ProducerMessage.Message message = (ProducerMessage.Message) envelope;
            Promise apply = Promise$.MODULE$.apply();
            this.awaitingConfirmation++;
            producer().send(message.record(), new SendCallback(this, message, apply));
            postSend(message);
            push(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.out(), apply.future());
            return;
        }
        if (!(envelope instanceof ProducerMessage.MultiMessage)) {
            if (!(envelope instanceof ProducerMessage.PassThroughMessage)) {
                throw new MatchError(envelope);
            }
            postSend((ProducerMessage.PassThroughMessage) envelope);
            push(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.out(), Future$.MODULE$.successful(ProducerMessage$PassThroughResult$.MODULE$.apply(envelope.passThrough())));
            return;
        }
        ProducerMessage.MultiMessage multiMessage = (ProducerMessage.MultiMessage) envelope;
        Seq seq = (Seq) multiMessage.records().map(producerRecord -> {
            Promise apply2 = Promise$.MODULE$.apply();
            this.awaitingConfirmation++;
            producer().send(producerRecord, new SendMultiCallback(this, producerRecord, apply2));
            return apply2.future();
        });
        postSend(multiMessage);
        ExecutionContextExecutor executionContext = materializer().executionContext();
        push(this.org$apache$pekko$kafka$internal$DefaultProducerStageLogic$$stage.out(), Future$.MODULE$.sequence(seq, BuildFrom$.MODULE$.buildFromIterableOps(), executionContext).map(seq2 -> {
            return ProducerMessage$MultiResult$.MODULE$.apply(seq2, multiMessage.passThrough());
        }, executionContext));
    }

    public void postStop() {
        log().debug("ProducerStage postStop");
        closeProducer();
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.postStop();
    }

    public <T> T protected$grab(Inlet<T> inlet) {
        return (T) grab(inlet);
    }

    public <T> void protected$tryPull(Inlet<T> inlet) {
        tryPull(inlet);
    }

    private static final Function1 decider$lzyINIT1$$anonfun$2() {
        return Supervision$.MODULE$.stoppingDecider();
    }
}
