package org.apache.pekko.kafka.internal;

import java.time.Duration;
import org.apache.kafka.clients.producer.Producer;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.dispatch.ExecutionContexts$;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.util.JavaDurationConverters$;
import org.apache.pekko.util.JavaDurationConverters$ScalaDurationOps$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.concurrent.Future;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: DeferredProducer.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/internal/DeferredProducer.class */
public interface DeferredProducer<K, V> {

    /* compiled from: DeferredProducer.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/DeferredProducer$ProducerAssignmentLifecycle.class */
    public interface ProducerAssignmentLifecycle {
    }

    Producer<K, V> producer();

    void producer_$eq(Producer<K, V> producer);

    ProducerAssignmentLifecycle producerAssignmentLifecycle();

    void producerAssignmentLifecycle_$eq(ProducerAssignmentLifecycle producerAssignmentLifecycle);

    ProducerSettings<K, V> producerSettings();

    void producerAssigned();

    AsyncCallback<Throwable> closeAndFailStageCb();

    private default void assignProducer(Producer<K, V> producer) {
        producer_$eq(producer);
        changeProducerAssignmentLifecycle(DeferredProducer$Assigned$.MODULE$);
        producerAssigned();
    }

    default void resolveProducer(ProducerSettings<K, V> producerSettings) {
        Future<Producer<K, V>> createKafkaProducerAsync = producerSettings.createKafkaProducerAsync(((GraphStageLogic) this).getExecutionContext());
        Some value = createKafkaProducerAsync.value();
        if (value instanceof Some) {
            Success success = (Try) value.value();
            if (success instanceof Success) {
                assignProducer((Producer) success.value());
                return;
            } else if (success instanceof Failure) {
                ((GraphStageLogic) this).failStage(((Failure) success).exception());
                return;
            }
        }
        if (!None$.MODULE$.equals(value)) {
            throw new MatchError(value);
        }
        AsyncCallback asyncCallback = ((GraphStageLogic) this).getAsyncCallback(producer -> {
            assignProducer(producer);
        });
        createKafkaProducerAsync.transform(producer2 -> {
            asyncCallback.invoke(producer2);
        }, th -> {
            ((GraphStageLogic) this).log().error(th, "producer creation failed");
            closeAndFailStageCb().invoke(th);
            return th;
        }, ExecutionContexts$.MODULE$.parasitic());
        changeProducerAssignmentLifecycle(DeferredProducer$AsyncCreateRequestSent$.MODULE$);
    }

    private default void changeProducerAssignmentLifecycle(ProducerAssignmentLifecycle producerAssignmentLifecycle) {
        ProducerAssignmentLifecycle producerAssignmentLifecycle2 = producerAssignmentLifecycle();
        producerAssignmentLifecycle_$eq(producerAssignmentLifecycle);
        ((GraphStageLogic) this).log().debug("Asynchronous producer assignment lifecycle changed '{} -> {}'", producerAssignmentLifecycle2, producerAssignmentLifecycle);
    }

    default void closeProducerImmediately() {
        if (producer() == null || !producerSettings().closeProducerOnStop()) {
            return;
        }
        producer().close(Duration.ZERO);
    }

    default void closeProducer() {
        if (producerSettings().closeProducerOnStop()) {
            ProducerAssignmentLifecycle producerAssignmentLifecycle = producerAssignmentLifecycle();
            DeferredProducer$Assigned$ deferredProducer$Assigned$ = DeferredProducer$Assigned$.MODULE$;
            if (producerAssignmentLifecycle == null) {
                if (deferredProducer$Assigned$ != null) {
                    return;
                }
            } else if (!producerAssignmentLifecycle.equals(deferredProducer$Assigned$)) {
                return;
            }
            try {
                producer().flush();
                producer().close(JavaDurationConverters$ScalaDurationOps$.MODULE$.asJava$extension(JavaDurationConverters$.MODULE$.ScalaDurationOps(producerSettings().closeTimeout())));
                ((GraphStageLogic) this).log().debug("Producer closed");
            } catch (Throwable th) {
                if (th != null) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (!unapply.isEmpty()) {
                        ((GraphStageLogic) this).log().error((Throwable) unapply.get(), "Problem occurred during producer close");
                        return;
                    }
                }
                throw th;
            }
        }
    }
}
