package ox.kafka;

import java.io.Serializable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.IO;
import ox.Ox;
import ox.channels.ActorRef;
import ox.channels.Channel;
import ox.channels.Source;
import ox.channels.StageCapacity$package$StageCapacity$;
import ox.fork$package$;
import scala.collection.immutable.Seq;
import scala.runtime.ModuleSerializationProxy;

/* compiled from: KafkaSource.scala */
/* loaded from: input_file:ox/kafka/KafkaSource$.class */
public final class KafkaSource$ implements Serializable {
    public static final KafkaSource$ MODULE$ = new KafkaSource$();
    private static final Logger logger = LoggerFactory.getLogger(KafkaSource$.class);

    private KafkaSource$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(KafkaSource$.class);
    }

    public <K, V> Source<ReceivedMessage<K, V>> subscribe(ConsumerSettings<K, V> consumerSettings, String str, Seq<String> seq, int i, Ox ox2, IO io) {
        return subscribe(consumerSettings.toConsumer(), true, str, seq, i, ox2, io);
    }

    public <K, V> Source<ReceivedMessage<K, V>> subscribe(KafkaConsumer<K, V> kafkaConsumer, boolean z, String str, Seq<String> seq, int i, Ox ox2, IO io) {
        return subscribe(KafkaConsumerWrapper$.MODULE$.apply(kafkaConsumer, z, ox2), str, seq, i, ox2, io);
    }

    public <K, V> Source<ReceivedMessage<K, V>> subscribe(ActorRef<KafkaConsumerWrapper<K, V>> actorRef, String str, Seq<String> seq, int i, Ox ox2, IO io) {
        actorRef.tell(kafkaConsumerWrapper -> {
            kafkaConsumerWrapper.subscribe(seq.toList().$colon$colon(str), io);
        });
        Channel newChannel = StageCapacity$package$StageCapacity$.MODULE$.newChannel(i);
        fork$package$.MODULE$.fork(() -> {
            return r1.subscribe$$anonfun$2(r2, r3, r4);
        }, ox2);
        return newChannel;
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    private final Object subscribe$$anonfun$2(ActorRef actorRef, IO io, Channel channel) {
        while (1 != 0) {
            try {
                ((ConsumerRecords) actorRef.ask(kafkaConsumerWrapper -> {
                    return kafkaConsumerWrapper.poll(io);
                })).forEach(consumerRecord -> {
                    channel.send(ReceivedMessage$.MODULE$.apply(actorRef, consumerRecord));
                });
            } catch (Throwable th) {
                logger.error("Exception when polling for records", th);
                return channel.errorOrClosed(th);
            }
        }
        throw new RuntimeException("can't get here");
    }
}
