package ox.kafka;

import java.io.Serializable;
import org.apache.kafka.common.TopicPartition;
import ox.Ox;
import ox.channels.ActorRef;
import ox.channels.ChannelClosed;
import ox.channels.ChannelClosed$Done$;
import ox.channels.ChannelClosed$Error$;
import ox.channels.Source;
import ox.channels.Source$;
import ox.channels.select$package$;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.None$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;

/* compiled from: kafkaOffsetCommit.scala */
/* loaded from: input_file:ox/kafka/kafkaOffsetCommit$package$.class */
public final class kafkaOffsetCommit$package$ implements Serializable {
    public static final kafkaOffsetCommit$package$ MODULE$ = new kafkaOffsetCommit$package$();

    private kafkaOffsetCommit$package$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(kafkaOffsetCommit$package$.class);
    }

    public void doCommit(Source<SendPacket<?, ?>> source, Ox ox2) {
        FiniteDuration second = new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).second();
        Source$ source$ = Source$.MODULE$;
        Source$.MODULE$.tick$default$2();
        Source tick = source$.tick(second, BoxedUnit.UNIT, ox2);
        Map map = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[0]));
        ObjectRef create = ObjectRef.create((Object) null);
        boolean z = true;
        while (z) {
            Object selectOrClosed = select$package$.MODULE$.selectOrClosed(tick, source);
            if (selectOrClosed instanceof ChannelClosed.Error) {
                throw ChannelClosed$Error$.MODULE$.unapply((ChannelClosed.Error) selectOrClosed)._1();
            }
            if (ChannelClosed$Done$.MODULE$.equals(selectOrClosed)) {
                z = false;
            } else {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                if (boxedUnit != null ? boxedUnit.equals(selectOrClosed) : selectOrClosed == null) {
                    if (((ActorRef) create.elem) != null && map.nonEmpty()) {
                        ((ActorRef) create.elem).ask(kafkaConsumerWrapper -> {
                            kafkaConsumerWrapper.commit(map.toMap($less$colon$less$.MODULE$.refl()));
                        });
                        map.clear();
                    }
                    z = true;
                } else {
                    if (!(selectOrClosed instanceof SendPacket)) {
                        throw new MatchError(selectOrClosed);
                    }
                    ((SendPacket) selectOrClosed).commit().foreach(receivedMessage -> {
                        if (((ActorRef) create.elem) == null) {
                            create.elem = receivedMessage.consumer();
                        }
                        return map.updateWith(new TopicPartition(receivedMessage.topic(), receivedMessage.partition()), option -> {
                            if (option instanceof Some) {
                                return Some$.MODULE$.apply(BoxesRunTime.boxToLong(scala.math.package$.MODULE$.max(BoxesRunTime.unboxToLong(((Some) option).value()), receivedMessage.offset())));
                            }
                            if (None$.MODULE$.equals(option)) {
                                return Some$.MODULE$.apply(BoxesRunTime.boxToLong(receivedMessage.offset()));
                            }
                            throw new MatchError(option);
                        });
                    });
                    z = true;
                }
            }
        }
    }
}
