package ox.kafka;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.Ox;
import ox.channels.Channel;
import ox.channels.Channel$;
import ox.channels.ChannelClosed;
import ox.channels.ChannelClosed$Done$;
import ox.channels.ChannelClosed$Error$;
import ox.channels.Sink;
import ox.channels.Source;
import ox.channels.StageCapacity$package$StageCapacity$;
import ox.channels.select$package$;
import ox.control$package$;
import ox.fork$package$;
import ox.kafka.KafkaStage;
import ox.scoped$package$;
import scala.MatchError;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.List;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.Nothing$;
import scala.runtime.ScalaRunTime$;

/* compiled from: KafkaStage.scala */
/* loaded from: input_file:ox/kafka/KafkaStage$.class */
public final class KafkaStage$ implements Serializable {
    public static final KafkaStage$ MODULE$ = new KafkaStage$();
    private static final Logger logger = LoggerFactory.getLogger(KafkaStage$.class);

    private KafkaStage$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(KafkaStage$.class);
    }

    public <K, V> Source<RecordMetadata> mapPublish(Source<ProducerRecord<K, V>> source, ProducerSettings<K, V> producerSettings, int i, Ox ox2) {
        return mapPublish(source, producerSettings.toProducer(), true, i, ox2);
    }

    public <K, V> Source<RecordMetadata> mapPublish(Source<ProducerRecord<K, V>> source, KafkaProducer<K, V> kafkaProducer, boolean z, int i, Ox ox2) {
        return mapPublishAndCommit(source.mapAsView(producerRecord -> {
            return SendPacket$.MODULE$.apply((List) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new ProducerRecord[]{producerRecord})), (List<ReceivedMessage<?, ?>>) scala.package$.MODULE$.Nil());
        }), kafkaProducer, z, false, i, ox2);
    }

    public <K, V> Source<RecordMetadata> mapPublishAndCommit(Source<SendPacket<K, V>> source, ProducerSettings<K, V> producerSettings, int i, Ox ox2) {
        return mapPublishAndCommit(source, producerSettings.toProducer(), true, i, ox2);
    }

    public <K, V> Source<RecordMetadata> mapPublishAndCommit(Source<SendPacket<K, V>> source, KafkaProducer<K, V> kafkaProducer, boolean z, int i, Ox ox2) {
        return mapPublishAndCommit(source, kafkaProducer, z, true, i, ox2);
    }

    private <K, V> Source<RecordMetadata> mapPublishAndCommit(Source<SendPacket<K, V>> source, KafkaProducer<K, V> kafkaProducer, boolean z, boolean z2, int i, Ox ox2) {
        Channel newChannel = StageCapacity$package$StageCapacity$.MODULE$.newChannel(i);
        Channel unlimited = Channel$.MODULE$.unlimited();
        Channel apply = Channel$.MODULE$.apply(128);
        Channel apply2 = Channel$.MODULE$.apply(128);
        KafkaStage.SendInSequence sendInSequence = new KafkaStage.SendInSequence(newChannel);
        fork$package$.MODULE$.forkDaemon(() -> {
            mapPublishAndCommit$$anonfun$1(source, kafkaProducer, z, z2, newChannel, unlimited, apply, apply2, sendInSequence);
            return BoxedUnit.UNIT;
        }, ox2);
        return newChannel;
    }

    private <K, V> void sendPacket(KafkaProducer<K, V> kafkaProducer, SendPacket<K, V> sendPacket, KafkaStage.SendInSequence<RecordMetadata> sendInSequence, Sink<SendPacket<?, ?>> sink, Sink<Exception> sink2, Sink<Tuple2<Object, RecordMetadata>> sink3) {
        AtomicInteger atomicInteger = new AtomicInteger(sendPacket.send().size());
        sendPacket.send().foreach(producerRecord -> {
            long nextSequenceNo = sendInSequence.nextSequenceNo();
            return kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    sink2.send(exc);
                    return;
                }
                sink3.send(Tuple2$.MODULE$.apply(BoxesRunTime.boxToLong(nextSequenceNo), recordMetadata));
                if (atomicInteger.decrementAndGet() == 0) {
                    sink.send(sendPacket);
                }
            });
        });
    }

    private final Nothing$ mapPublishAndCommit$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1(Channel channel, Ox ox2) {
        return kafkaOffsetCommit$package$.MODULE$.doCommit(channel, ox2);
    }

    private final void mapPublishAndCommit$$anonfun$1$$anonfun$1$$anonfun$1(Channel channel, Channel channel2, Ox ox2) {
        package$.MODULE$.tapException(() -> {
            return r1.mapPublishAndCommit$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1(r2, r3);
        }, th -> {
            channel.error(th);
        });
    }

    private final boolean mapPublishAndCommit$$anonfun$1$$anonfun$1$$anonfun$2(Source source, KafkaProducer kafkaProducer, Channel channel, Channel channel2, Channel channel3, Channel channel4, KafkaStage.SendInSequence sendInSequence) {
        Tuple2 tuple2;
        Object select = select$package$.MODULE$.select(channel2.receiveClause(), channel3.receiveClause(), source.receiveOrDoneClause());
        if (select instanceof ChannelClosed.Error) {
            channel.error(ChannelClosed$Error$.MODULE$.unapply((ChannelClosed.Error) select)._1());
            return false;
        }
        if (ChannelClosed$Done$.MODULE$.equals(select)) {
            sendInSequence.drainFromThenDone(channel2, channel3);
            return false;
        }
        if ((select instanceof Source.Received) && ((Source.Received) select).ox$channels$Source$Received$$$outer() == channel2) {
            channel.error((Exception) channel2.Received().unapply((Source.Received) select)._1());
            return false;
        }
        if ((select instanceof Source.Received) && ((Source.Received) select).ox$channels$Source$Received$$$outer() == channel3 && (tuple2 = (Tuple2) channel3.Received().unapply((Source.Received) select)._1()) != null) {
            sendInSequence.send(BoxesRunTime.unboxToLong(tuple2._1()), (RecordMetadata) tuple2._2());
            return true;
        }
        if (!(select instanceof Source.Received) || ((Source.Received) select).ox$channels$Source$Received$$$outer() != source) {
            throw new MatchError(select);
        }
        try {
            sendPacket(kafkaProducer, (SendPacket) source.Received().unapply((Source.Received) select)._1(), sendInSequence, channel4, channel2, channel3);
            return true;
        } catch (Exception e) {
            channel.error(e);
            return false;
        }
    }

    private final void mapPublishAndCommit$$anonfun$1$$anonfun$2(KafkaProducer kafkaProducer) {
        kafkaProducer.close();
    }

    private final void mapPublishAndCommit$$anonfun$1(Source source, KafkaProducer kafkaProducer, boolean z, boolean z2, Channel channel, Channel channel2, Channel channel3, Channel channel4, KafkaStage.SendInSequence sendInSequence) {
        try {
            scoped$package$.MODULE$.scoped(ox2 -> {
                if (z2) {
                    fork$package$.MODULE$.fork(() -> {
                        mapPublishAndCommit$$anonfun$1$$anonfun$1$$anonfun$1(channel, channel4, ox2);
                        return BoxedUnit.UNIT;
                    }, ox2);
                }
                control$package$.MODULE$.repeatWhile(() -> {
                    return r1.mapPublishAndCommit$$anonfun$1$$anonfun$1$$anonfun$2(r2, r3, r4, r5, r6, r7, r8);
                });
            });
        } finally {
            if (z) {
                logger.debug("Closing the Kafka producer");
                control$package$.MODULE$.uninterruptible(() -> {
                    mapPublishAndCommit$$anonfun$1$$anonfun$2(kafkaProducer);
                    return BoxedUnit.UNIT;
                });
            }
        }
    }
}
