package ox.kafka;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.IO;
import ox.Ox;
import ox.UnsupervisedFork;
import ox.channels.Channel;
import ox.channels.Channel$;
import ox.channels.ChannelClosed;
import ox.channels.ChannelClosed$Done$;
import ox.channels.ChannelClosed$Error$;
import ox.channels.Sink;
import ox.channels.Source;
import ox.channels.Source$;
import ox.channels.StageCapacity$package$StageCapacity$;
import ox.channels.select$package$;
import ox.fork$package$;
import ox.unsupervised$package$;
import scala.MatchError;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.List;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.ScalaRunTime$;

/* compiled from: KafkaStage.scala */
/* loaded from: input_file:ox/kafka/KafkaStage$.class */
public final class KafkaStage$ implements Serializable {
    public static final KafkaStage$ MODULE$ = new KafkaStage$();
    private static final Logger logger = LoggerFactory.getLogger(KafkaStage$.class);

    private KafkaStage$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(KafkaStage$.class);
    }

    public <K, V> Source<RecordMetadata> mapPublish(Source<ProducerRecord<K, V>> source, ProducerSettings<K, V> producerSettings, int i, Ox ox2, IO io) {
        return mapPublish(source, producerSettings.toProducer(), true, i, ox2, io);
    }

    public <K, V> Source<RecordMetadata> mapPublish(Source<ProducerRecord<K, V>> source, KafkaProducer<K, V> kafkaProducer, boolean z, int i, Ox ox2, IO io) {
        return mapPublishAndCommit(source.mapAsView(producerRecord -> {
            return SendPacket$.MODULE$.apply((List) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new ProducerRecord[]{producerRecord})), (List<ReceivedMessage<?, ?>>) scala.package$.MODULE$.Nil());
        }), kafkaProducer, z, false, i, ox2, io);
    }

    public <K, V> Source<RecordMetadata> mapPublishAndCommit(Source<SendPacket<K, V>> source, ProducerSettings<K, V> producerSettings, int i, Ox ox2, IO io) {
        return mapPublishAndCommit(source, producerSettings.toProducer(), true, i, ox2, io);
    }

    public <K, V> Source<RecordMetadata> mapPublishAndCommit(Source<SendPacket<K, V>> source, KafkaProducer<K, V> kafkaProducer, boolean z, int i, Ox ox2, IO io) {
        return mapPublishAndCommit(source, kafkaProducer, z, true, i, ox2, io);
    }

    private <K, V> Source<RecordMetadata> mapPublishAndCommit(Source<SendPacket<K, V>> source, KafkaProducer<K, V> kafkaProducer, boolean z, boolean z2, int i, Ox ox2, IO io) {
        Channel newChannel = StageCapacity$package$StageCapacity$.MODULE$.newChannel(i);
        Channel unlimited = Channel$.MODULE$.unlimited();
        Channel unlimited2 = Channel$.MODULE$.unlimited();
        Channel newChannel2 = StageCapacity$package$StageCapacity$.MODULE$.newChannel(i);
        SendInSequence sendInSequence = new SendInSequence(newChannel);
        fork$package$.MODULE$.fork(() -> {
            mapPublishAndCommit$$anonfun$1(z2, newChannel2, ox2, io, newChannel, i, unlimited, unlimited2, source, sendInSequence, kafkaProducer, z);
            return BoxedUnit.UNIT;
        }, ox2);
        return newChannel;
    }

    private <K, V> void sendPacket(KafkaProducer<K, V> kafkaProducer, SendPacket<K, V> sendPacket, SendInSequence<RecordMetadata> sendInSequence, Sink<SendPacket<?, ?>> sink, Sink<Exception> sink2, Sink<Tuple2<Object, RecordMetadata>> sink3, boolean z, IO io) {
        AtomicInteger atomicInteger = new AtomicInteger(sendPacket.send().size());
        sendPacket.send().foreach(producerRecord -> {
            long nextSequenceNo = sendInSequence.nextSequenceNo();
            return kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    sink2.sendOrClosed(exc);
                    return;
                }
                if (z && atomicInteger.decrementAndGet() == 0) {
                    sink.send(sendPacket);
                }
                sink3.send(Tuple2$.MODULE$.apply(BoxesRunTime.boxToLong(nextSequenceNo), recordMetadata));
            });
        });
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    private final void $anonfun$1(Channel channel, Ox ox2, IO io, Channel channel2) {
        try {
            kafkaOffsetCommit$package$.MODULE$.doCommit(channel, ox2, io);
        } catch (Throwable th) {
            try {
                channel2.errorOrClosed(th);
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private final void $anonfun$2(KafkaProducer kafkaProducer) {
        kafkaProducer.close();
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    private final void joinDespiteInterrupted$1(UnsupervisedFork unsupervisedFork) {
        try {
            unsupervisedFork.join();
        } catch (InterruptedException e) {
            joinDespiteInterrupted$1(unsupervisedFork);
            throw e;
        }
    }

    private final void mapPublishAndCommit$$anonfun$1(boolean z, Channel channel, Ox ox2, IO io, Channel channel2, int i, Channel channel3, Channel channel4, Source source, SendInSequence sendInSequence, KafkaProducer kafkaProducer, boolean z2) {
        try {
            unsupervised$package$.MODULE$.unsupervised(oxUnsupervised -> {
                Tuple2 tuple2;
                Source fromFork = z ? Source$.MODULE$.fromFork(fork$package$.MODULE$.fork(() -> {
                    $anonfun$1(channel, ox2, io, channel2);
                    return BoxedUnit.UNIT;
                }, ox2), ox2, i) : Source$.MODULE$.empty();
                boolean z3 = true;
                while (z3) {
                    Object selectOrClosed = select$package$.MODULE$.selectOrClosed(channel3.receiveClause(), channel4.receiveClause(), source.receiveClause());
                    if (selectOrClosed instanceof ChannelClosed.Error) {
                        channel2.errorOrClosed(ChannelClosed$Error$.MODULE$.unapply((ChannelClosed.Error) selectOrClosed)._1());
                        z3 = false;
                    } else if (ChannelClosed$Done$.MODULE$.equals(selectOrClosed)) {
                        sendInSequence.drainFrom(channel4, channel3);
                        channel.done();
                        fromFork.receiveOrClosed();
                        channel2.done();
                        z3 = false;
                    } else if ((selectOrClosed instanceof Source.Received) && ((Source.Received) selectOrClosed).ox$channels$Source$Received$$$outer() == channel3) {
                        channel2.errorOrClosed((Exception) channel3.Received().unapply((Source.Received) selectOrClosed)._1());
                        z3 = false;
                    } else if ((selectOrClosed instanceof Source.Received) && ((Source.Received) selectOrClosed).ox$channels$Source$Received$$$outer() == channel4 && (tuple2 = (Tuple2) channel4.Received().unapply((Source.Received) selectOrClosed)._1()) != null) {
                        sendInSequence.send(BoxesRunTime.unboxToLong(tuple2._1()), (RecordMetadata) tuple2._2());
                        z3 = true;
                    } else {
                        if (!(selectOrClosed instanceof Source.Received) || ((Source.Received) selectOrClosed).ox$channels$Source$Received$$$outer() != source) {
                            throw new MatchError(selectOrClosed);
                        }
                        try {
                            sendPacket(kafkaProducer, (SendPacket) source.Received().unapply((Source.Received) selectOrClosed)._1(), sendInSequence, channel, channel3, channel4, z, io);
                            z3 = true;
                        } catch (Exception e) {
                            channel2.errorOrClosed(e);
                            z3 = false;
                        }
                    }
                }
            });
        } finally {
            if (z2) {
                logger.debug("Closing the Kafka producer");
                unsupervised$package$.MODULE$.unsupervised(oxUnsupervised2 -> {
                    joinDespiteInterrupted$1(fork$package$.MODULE$.forkUnsupervised(() -> {
                        $anonfun$2(kafkaProducer);
                        return BoxedUnit.UNIT;
                    }, oxUnsupervised2));
                });
            }
        }
    }
}
