package ox.kafka;

import java.io.Serializable;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.Fork;
import ox.channels.Channel$;
import ox.channels.ChannelClosed;
import ox.channels.ChannelClosed$Done$;
import ox.channels.Source;
import ox.channels.StageCapacity$package$StageCapacity$;
import ox.channels.select$package$;
import ox.fork$package$;
import ox.supervised$package$;
import ox.unsupervised$package$;
import scala.Function1;
import scala.MatchError;
import scala.runtime.BoxedUnit;
import scala.runtime.ModuleSerializationProxy;

/* compiled from: KafkaDrain.scala */
/* loaded from: input_file:ox/kafka/KafkaDrain$.class */
public final class KafkaDrain$ implements Serializable {
    public static final KafkaDrain$ MODULE$ = new KafkaDrain$();
    private static final Logger logger = LoggerFactory.getLogger(KafkaDrain$.class);

    private KafkaDrain$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(KafkaDrain$.class);
    }

    public <K, V> Function1<Source<ProducerRecord<K, V>>, BoxedUnit> publish(ProducerSettings<K, V> producerSettings) {
        return source -> {
            publish(producerSettings.toProducer(), true).apply(source);
        };
    }

    public <K, V> Function1<Source<ProducerRecord<K, V>>, BoxedUnit> publish(KafkaProducer<K, V> kafkaProducer, boolean z) {
        return source -> {
            boolean z2;
            Source unlimited = Channel$.MODULE$.unlimited();
            boolean z3 = true;
            while (z3) {
                try {
                    Object selectOrClosed = select$package$.MODULE$.selectOrClosed(unlimited.receiveClause(), source.receiveClause());
                    if (selectOrClosed instanceof ChannelClosed.Error) {
                        throw ((ChannelClosed.Error) selectOrClosed).toThrowable();
                    }
                    if (ChannelClosed$Done$.MODULE$.equals(selectOrClosed)) {
                        z2 = false;
                    } else {
                        if ((selectOrClosed instanceof Source.Received) && ((Source.Received) selectOrClosed).ox$channels$Source$Received$$$outer() == unlimited) {
                            throw ((Throwable) unlimited.Received().unapply((Source.Received) selectOrClosed)._1());
                        }
                        if (!(selectOrClosed instanceof Source.Received) || ((Source.Received) selectOrClosed).ox$channels$Source$Received$$$outer() != source) {
                            throw new MatchError(selectOrClosed);
                        }
                        kafkaProducer.send((ProducerRecord) source.Received().unapply((Source.Received) selectOrClosed)._1(), (recordMetadata, exc) -> {
                            if (exc != null) {
                                logger.error("Exception when sending record", exc);
                                unlimited.sendOrClosed(exc);
                            }
                        });
                        z2 = true;
                    }
                    z3 = z2;
                } finally {
                    if (z) {
                        unsupervised$package$.MODULE$.unsupervised(oxUnsupervised -> {
                            joinDespiteInterrupted$1(fork$package$.MODULE$.forkUnsupervised(() -> {
                                $anonfun$1(kafkaProducer);
                                return BoxedUnit.UNIT;
                            }, oxUnsupervised));
                        });
                    }
                }
            }
        };
    }

    public <K, V> Function1<Source<SendPacket<K, V>>, BoxedUnit> publishAndCommit(ProducerSettings<K, V> producerSettings) {
        return source -> {
            publishAndCommit(producerSettings.toProducer(), true).apply(source);
        };
    }

    public <K, V> Function1<Source<SendPacket<K, V>>, BoxedUnit> publishAndCommit(KafkaProducer<K, V> kafkaProducer, boolean z) {
        return source -> {
            supervised$package$.MODULE$.supervised(ox2 -> {
                KafkaStage$.MODULE$.mapPublishAndCommit(source, kafkaProducer, z, StageCapacity$package$StageCapacity$.MODULE$.default(), ox2).drain();
            });
        };
    }

    private final void $anonfun$1(KafkaProducer kafkaProducer) {
        kafkaProducer.close();
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    private final void joinDespiteInterrupted$1(Fork fork) {
        try {
            fork.join();
        } catch (InterruptedException e) {
            joinDespiteInterrupted$1(fork);
            throw e;
        }
    }
}
