package ox.kafka;

import java.io.Serializable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.Ox;
import ox.channels.Channel;
import ox.channels.Channel$;
import ox.channels.Sink;
import ox.channels.Source;
import ox.channels.StageCapacity$package$StageCapacity$;
import ox.control$package$;
import ox.fork$package$;
import scala.Option;
import scala.collection.immutable.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.ModuleSerializationProxy;
import scala.util.control.NonFatal$;

/* compiled from: KafkaSource.scala */
/* loaded from: input_file:ox/kafka/KafkaSource$.class */
public final class KafkaSource$ implements Serializable {
    public static final KafkaSource$ MODULE$ = new KafkaSource$();
    private static final Logger logger = LoggerFactory.getLogger(KafkaSource$.class);

    private KafkaSource$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(KafkaSource$.class);
    }

    public <K, V> Source<ReceivedMessage<K, V>> subscribe(ConsumerSettings<K, V> consumerSettings, String str, Seq<String> seq, int i, Ox ox2) {
        return subscribe(consumerSettings.toConsumer(), true, str, seq, i, ox2);
    }

    public <K, V> Source<ReceivedMessage<K, V>> subscribe(KafkaConsumer<K, V> kafkaConsumer, boolean z, String str, Seq<String> seq, int i, Ox ox2) {
        return subscribe(KafkaConsumerActor$.MODULE$.apply(kafkaConsumer, z, ox2), str, seq, i, ox2);
    }

    public <K, V> Source<ReceivedMessage<K, V>> subscribe(Sink<KafkaConsumerRequest<K, V>> sink, String str, Seq<String> seq, int i, Ox ox2) {
        sink.send(KafkaConsumerRequest$Subscribe$.MODULE$.apply(seq.toList().$colon$colon(str)));
        Channel newChannel = StageCapacity$package$StageCapacity$.MODULE$.newChannel(i);
        fork$package$.MODULE$.fork(() -> {
            return r1.subscribe$$anonfun$1(r2, r3);
        }, ox2);
        return newChannel;
    }

    private final void subscribe$$anonfun$1$$anonfun$1(Sink sink, Channel channel, Channel channel2) {
        sink.send(KafkaConsumerRequest$Poll$.MODULE$.apply(channel));
        ((ConsumerRecords) channel.receive()).forEach(consumerRecord -> {
            channel2.send(ReceivedMessage$.MODULE$.apply(sink, consumerRecord));
        });
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    private final Object subscribe$$anonfun$1(Sink sink, Channel channel) {
        try {
            Channel rendezvous = Channel$.MODULE$.rendezvous();
            throw control$package$.MODULE$.forever(() -> {
                subscribe$$anonfun$1$$anonfun$1(sink, rendezvous, channel);
                return BoxedUnit.UNIT;
            });
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    Throwable th2 = (Throwable) unapply.get();
                    logger.error("Exception when polling for records", th2);
                    return channel.errorSafe(th2);
                }
            }
            throw th;
        }
    }
}
