package org.apache.pekko.kafka.scaladsl;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.ClassicActorSystemProvider;
import org.apache.pekko.kafka.ProducerMessage;
import org.apache.pekko.kafka.ProducerMessage$MultiResult$;
import org.apache.pekko.kafka.ProducerMessage$MultiResultPart$;
import org.apache.pekko.kafka.ProducerMessage$PassThroughResult$;
import org.apache.pekko.kafka.ProducerMessage$Result$;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.pekko.util.JavaDurationConverters$;
import org.apache.pekko.util.JavaDurationConverters$ScalaDurationOps$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.collection.BuildFrom$;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;

/* compiled from: SendProducer.scala */
/* loaded from: input_file:org/apache/pekko/kafka/scaladsl/SendProducer.class */
public final class SendProducer<K, V> {
    private final ProducerSettings settings;
    private final ExecutionContext ec;
    private final Future<org.apache.kafka.clients.producer.Producer<K, V>> producerFuture;

    public static <K, V> SendProducer<K, V> apply(ProducerSettings<K, V> producerSettings, ActorSystem actorSystem) {
        return SendProducer$.MODULE$.apply(producerSettings, actorSystem);
    }

    public static <K, V> SendProducer<K, V> apply(ProducerSettings<K, V> producerSettings, ClassicActorSystemProvider classicActorSystemProvider) {
        return SendProducer$.MODULE$.apply(producerSettings, classicActorSystemProvider);
    }

    public SendProducer(ProducerSettings<K, V> producerSettings, ActorSystem actorSystem) {
        this.settings = producerSettings;
        this.ec = actorSystem.dispatchers().lookup(producerSettings.dispatcher());
        this.producerFuture = producerSettings.createKafkaProducerAsync(this.ec);
    }

    public ProducerSettings<K, V> settings() {
        return this.settings;
    }

    public <PT> Future<ProducerMessage.Results<K, V, PT>> sendEnvelope(ProducerMessage.Envelope<K, V, PT> envelope) {
        return this.producerFuture.flatMap(producer -> {
            if (envelope instanceof ProducerMessage.Message) {
                ProducerMessage.Message message = (ProducerMessage.Message) envelope;
                return sendSingle(producer, message.record(), recordMetadata -> {
                    return ProducerMessage$Result$.MODULE$.apply(recordMetadata, message);
                });
            }
            if (envelope instanceof ProducerMessage.MultiMessage) {
                ProducerMessage.MultiMessage multiMessage = (ProducerMessage.MultiMessage) envelope;
                return Future$.MODULE$.sequence((Seq) multiMessage.records().map(producerRecord -> {
                    return sendSingle(producer, producerRecord, recordMetadata2 -> {
                        return ProducerMessage$MultiResultPart$.MODULE$.apply(recordMetadata2, producerRecord);
                    });
                }), BuildFrom$.MODULE$.buildFromIterableOps(), this.ec).map(seq -> {
                    return ProducerMessage$MultiResult$.MODULE$.apply(seq, multiMessage.passThrough());
                }, this.ec);
            }
            if (envelope instanceof ProducerMessage.PassThroughMessage) {
                return Future$.MODULE$.successful(ProducerMessage$PassThroughResult$.MODULE$.apply(((ProducerMessage.PassThroughMessage) envelope).passThrough()));
            }
            throw new MatchError(envelope);
        }, this.ec);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return this.producerFuture.flatMap(producer -> {
            return sendSingle(producer, producerRecord, recordMetadata -> {
                return (RecordMetadata) Predef$.MODULE$.identity(recordMetadata);
            });
        }, this.ec);
    }

    private <R> Future<R> sendSingle(org.apache.kafka.clients.producer.Producer<K, V> producer, ProducerRecord<K, V> producerRecord, final Function1<RecordMetadata, R> function1) {
        final Promise apply = Promise$.MODULE$.apply();
        producer.send(producerRecord, new Callback(apply, function1) { // from class: org.apache.pekko.kafka.scaladsl.SendProducer$$anon$1
            private final Promise result$1;
            private final Function1 success$1;

            {
                this.result$1 = apply;
                this.success$1 = function1;
            }

            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    this.result$1.success(this.success$1.apply(recordMetadata));
                } else {
                    this.result$1.failure(exc);
                }
            }
        });
        return apply.future();
    }

    public Future<Done> close() {
        return settings().closeProducerOnStop() ? this.producerFuture.map(producer -> {
            producer.flush();
            producer.close(JavaDurationConverters$ScalaDurationOps$.MODULE$.asJava$extension(JavaDurationConverters$.MODULE$.ScalaDurationOps(settings().closeTimeout())));
            return Done$.MODULE$;
        }, this.ec) : Future$.MODULE$.successful(Done$.MODULE$);
    }

    public String toString() {
        return new StringBuilder(14).append("SendProducer(").append(settings()).append(")").toString();
    }
}
