package ox.kafka;

import java.io.Serializable;
import java.time.Duration;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ox.Ox;
import ox.channels.Channel;
import ox.channels.Channel$;
import ox.channels.ChannelClosed;
import ox.channels.ChannelClosed$Done$;
import ox.channels.Sink;
import ox.control$package$;
import ox.fork$package$;
import ox.kafka.KafkaConsumerRequest;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.Option;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.util.control.NonFatal$;

/* compiled from: KafkaConsumerActor.scala */
/* loaded from: input_file:ox/kafka/KafkaConsumerActor$.class */
public final class KafkaConsumerActor$ implements Serializable {
    public static final KafkaConsumerActor$ MODULE$ = new KafkaConsumerActor$();
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerActor$.class);

    private KafkaConsumerActor$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(KafkaConsumerActor$.class);
    }

    public <K, V> Sink<KafkaConsumerRequest<K, V>> apply(KafkaConsumer<K, V> kafkaConsumer, boolean z, Ox ox2) {
        Channel rendezvous = Channel$.MODULE$.rendezvous();
        fork$package$.MODULE$.fork(() -> {
            apply$$anonfun$1(rendezvous, kafkaConsumer, z);
            return BoxedUnit.UNIT;
        }, ox2);
        return rendezvous;
    }

    private final /* synthetic */ OffsetAndMetadata apply$$anonfun$1$$anonfun$1$$anonfun$1(long j) {
        return new OffsetAndMetadata(j + 1);
    }

    private final boolean apply$$anonfun$1$$anonfun$1(Channel channel, KafkaConsumer kafkaConsumer) {
        Object receiveSafe = channel.receiveSafe();
        if (ChannelClosed$Done$.MODULE$.equals(receiveSafe)) {
            logger.debug("Stopping Kafka consumer actor: upstream done");
            return false;
        }
        if (receiveSafe instanceof ChannelClosed.Error) {
            logger.debug(new StringBuilder(65).append("Stopping Kafka consumer actor: upstream closed due to an error (").append((ChannelClosed.Error) receiveSafe).append(")").toString());
            return false;
        }
        if (receiveSafe instanceof KafkaConsumerRequest.Subscribe) {
            Seq<String> _1 = KafkaConsumerRequest$Subscribe$.MODULE$.unapply((KafkaConsumerRequest.Subscribe) receiveSafe)._1();
            try {
                kafkaConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(_1).asJava());
                return true;
            } catch (Throwable th) {
                if (th != null) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (!unapply.isEmpty()) {
                        Throwable th2 = (Throwable) unapply.get();
                        logger.error(new StringBuilder(30).append("Exception when subscribing to ").append(_1).toString(), th2);
                        channel.errorSafe(th2);
                        return false;
                    }
                }
                throw th;
            }
        }
        if (receiveSafe instanceof KafkaConsumerRequest.Poll) {
            Sink _12 = KafkaConsumerRequest$Poll$.MODULE$.unapply((KafkaConsumerRequest.Poll) receiveSafe)._1();
            try {
                _12.send(kafkaConsumer.poll(Duration.ofMillis(100L)));
                return true;
            } catch (Throwable th3) {
                if (th3 != null) {
                    Option unapply2 = NonFatal$.MODULE$.unapply(th3);
                    if (!unapply2.isEmpty()) {
                        Throwable th4 = (Throwable) unapply2.get();
                        logger.error("Exception when polling for records in Kafka", th4);
                        _12.errorSafe(th4);
                        channel.errorSafe(th4);
                        return false;
                    }
                }
                throw th3;
            }
        }
        if (!(receiveSafe instanceof KafkaConsumerRequest.Commit)) {
            throw new MatchError(receiveSafe);
        }
        KafkaConsumerRequest.Commit unapply3 = KafkaConsumerRequest$Commit$.MODULE$.unapply((KafkaConsumerRequest.Commit) receiveSafe);
        Map<TopicPartition, Object> _13 = unapply3._1();
        Sink<BoxedUnit> _2 = unapply3._2();
        try {
            kafkaConsumer.commitSync(CollectionConverters$.MODULE$.MapHasAsJava(_13.view().mapValues(obj -> {
                return apply$$anonfun$1$$anonfun$1$$anonfun$1(BoxesRunTime.unboxToLong(obj));
            }).toMap($less$colon$less$.MODULE$.refl())).asJava());
            _2.sendSafe(BoxedUnit.UNIT);
            return true;
        } catch (Throwable th5) {
            if (th5 != null) {
                Option unapply4 = NonFatal$.MODULE$.unapply(th5);
                if (!unapply4.isEmpty()) {
                    Throwable th6 = (Throwable) unapply4.get();
                    logger.error("Exception when committing offsets", th6);
                    _2.errorSafe(th6);
                    channel.errorSafe(th6);
                    return false;
                }
            }
            throw th5;
        }
    }

    private final void apply$$anonfun$1$$anonfun$2(KafkaConsumer kafkaConsumer) {
        kafkaConsumer.close();
    }

    private final void apply$$anonfun$1(Channel channel, KafkaConsumer kafkaConsumer, boolean z) {
        try {
            control$package$.MODULE$.repeatWhile(() -> {
                return r1.apply$$anonfun$1$$anonfun$1(r2, r3);
            });
        } finally {
            if (z) {
                logger.debug("Closing the Kafka consumer");
                control$package$.MODULE$.uninterruptible(() -> {
                    apply$$anonfun$1$$anonfun$2(kafkaConsumer);
                    return BoxedUnit.UNIT;
                });
            }
        }
    }
}
