package ox.kafka;

import java.io.Serializable;
import org.apache.kafka.common.TopicPartition;
import ox.Ox;
import ox.channels.ChannelClosed;
import ox.channels.Sink;
import ox.channels.Source;
import ox.channels.Source$;
import ox.channels.StageCapacity$package$StageCapacity$;
import ox.channels.select$package$;
import ox.control$package$;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.None$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.Nothing$;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;

/* compiled from: kafkaOffsetCommit.scala */
/* loaded from: input_file:ox/kafka/kafkaOffsetCommit$package$.class */
public final class kafkaOffsetCommit$package$ implements Serializable {
    public static final kafkaOffsetCommit$package$ MODULE$ = new kafkaOffsetCommit$package$();

    private kafkaOffsetCommit$package$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(kafkaOffsetCommit$package$.class);
    }

    public Nothing$ doCommit(Source<SendPacket<?, ?>> source, Ox ox2) {
        FiniteDuration second = new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).second();
        Source$ source$ = Source$.MODULE$;
        Source$.MODULE$.tick$default$2();
        Source tick = source$.tick(second, BoxedUnit.UNIT, ox2, StageCapacity$package$StageCapacity$.MODULE$.default());
        Map map = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[0]));
        ObjectRef create = ObjectRef.create((Object) null);
        return control$package$.MODULE$.forever(() -> {
            doCommit$$anonfun$1(source, tick, map, create);
            return BoxedUnit.UNIT;
        });
    }

    private final void doCommit$$anonfun$1(Source source, Source source2, Map map, ObjectRef objectRef) {
        Object select = select$package$.MODULE$.select(source2, source);
        if (select instanceof ChannelClosed) {
            throw ((ChannelClosed) select).toThrowable();
        }
        if (!(select instanceof BoxedUnit) && !(select instanceof SendPacket)) {
            throw new MatchError(select);
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        if (boxedUnit != null ? !boxedUnit.equals(select) : select != null) {
            if (!(select instanceof SendPacket)) {
                throw new MatchError(select);
            }
            ((SendPacket) select).commit().foreach(receivedMessage -> {
                if (((Sink) objectRef.elem) == null) {
                    objectRef.elem = receivedMessage.consumer();
                }
                return map.updateWith(new TopicPartition(receivedMessage.topic(), receivedMessage.partition()), option -> {
                    if (option instanceof Some) {
                        return Some$.MODULE$.apply(BoxesRunTime.boxToLong(scala.math.package$.MODULE$.max(BoxesRunTime.unboxToLong(((Some) option).value()), receivedMessage.offset())));
                    }
                    if (None$.MODULE$.equals(option)) {
                        return Some$.MODULE$.apply(BoxesRunTime.boxToLong(receivedMessage.offset()));
                    }
                    throw new MatchError(option);
                });
            });
        } else {
            if (((Sink) objectRef.elem) == null || !map.nonEmpty()) {
                return;
            }
            ((Sink) objectRef.elem).send(KafkaConsumerRequest$Commit$.MODULE$.apply(map.toMap($less$colon$less$.MODULE$.refl())));
            map.clear();
        }
    }
}
