package aecor.kafkadistributedprocessing.internal;

import aecor.data.Committable;
import aecor.kafkadistributedprocessing.internal.Kafka;
import aecor.kafkadistributedprocessing.internal.RebalanceEvents;
import cats.effect.ConcurrentEffect;
import cats.effect.ContextShift;
import cats.effect.Timer;
import cats.implicits$;
import cats.syntax.FlatMapOps$;
import cats.syntax.NestedFoldableOps$;
import fs2.Stream;
import fs2.Stream$;
import fs2.internal.FreeC;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: Kafka.scala */
/* loaded from: input_file:aecor/kafkadistributedprocessing/internal/Kafka$.class */
public final class Kafka$ {
    public static Kafka$ MODULE$;
    private volatile boolean bitmap$init$0;

    static {
        new Kafka$();
    }

    public <F> FreeC<?, BoxedUnit> watchRebalanceEvents(KafkaConsumer<F, BoxedUnit, BoxedUnit> kafkaConsumer, String str, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, ConcurrentEffect<F> concurrentEffect, Timer<F> timer) {
        return Stream$.MODULE$.force(implicits$.MODULE$.toFunctorOps(RebalanceEvents$.MODULE$.apply().subscribe(consumerRebalanceListener -> {
            return kafkaConsumer.subscribe((Set) Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{str})), consumerRebalanceListener);
        }, concurrentEffect), concurrentEffect).map(obj -> {
            return new Stream($anonfun$watchRebalanceEvents$2(kafkaConsumer, concurrentEffect, finiteDuration2, timer, finiteDuration, ((Stream) obj).fs2$Stream$$free()));
        }));
    }

    public <F> FreeC<?, BoxedUnit> assignPartitions(Properties properties, String str, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, ConcurrentEffect<F> concurrentEffect, Timer<F> timer, ContextShift<F> contextShift) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.resource(KafkaConsumer$.MODULE$.create().apply(properties, new Kafka.UnitDeserializer(), new Kafka.UnitDeserializer(), concurrentEffect, contextShift)), kafkaConsumer -> {
            return new Stream($anonfun$assignPartitions$1(str, concurrentEffect, finiteDuration, finiteDuration2, timer, kafkaConsumer));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$watchRebalanceEvents$2(KafkaConsumer kafkaConsumer, ConcurrentEffect concurrentEffect, FiniteDuration finiteDuration, Timer timer, FiniteDuration finiteDuration2, FreeC freeC) {
        return Stream$.MODULE$.concurrently$extension(Stream$.MODULE$.onFinalize$extension(freeC, kafkaConsumer.unsubscribe(), concurrentEffect), Stream$.MODULE$.repeatEval(FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(kafkaConsumer.poll(finiteDuration), concurrentEffect), () -> {
            return timer.sleep(finiteDuration2);
        }, concurrentEffect)), concurrentEffect);
    }

    public static final /* synthetic */ Iterable $anonfun$assignPartitions$11(Map map, int i) {
        return Option$.MODULE$.option2Iterable(map.get(BoxesRunTime.boxToInteger(i)));
    }

    public static final /* synthetic */ FreeC $anonfun$assignPartitions$3(KafkaConsumer kafkaConsumer, String str, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, ConcurrentEffect concurrentEffect, Timer timer, int i) {
        return Stream$.MODULE$.evalScan$extension(MODULE$.watchRebalanceEvents(kafkaConsumer, str, finiteDuration, finiteDuration2, concurrentEffect, timer), new Tuple2(List$.MODULE$.empty(), Predef$.MODULE$.Map().empty()), (tuple2, committable) -> {
            Object as;
            Tuple2 tuple2 = new Tuple2(tuple2, committable);
            if (tuple2 != null) {
                Tuple2 tuple22 = (Tuple2) tuple2._1();
                Committable committable = (Committable) tuple2._2();
                if (tuple22 != null) {
                    Map map = (Map) tuple22._2();
                    if (committable != null) {
                        Object commit = committable.commit();
                        RebalanceEvents.RebalanceEvent rebalanceEvent = (RebalanceEvents.RebalanceEvent) committable.value();
                        if (rebalanceEvent instanceof RebalanceEvents.RebalanceEvent.PartitionsAssigned) {
                            as = implicits$.MODULE$.toFunctorOps(implicits$.MODULE$.toTraverseOps(((RebalanceEvents.RebalanceEvent.PartitionsAssigned) rebalanceEvent).partitions().toList(), implicits$.MODULE$.catsStdInstancesForList()).traverse(topicPartition -> {
                                return implicits$.MODULE$.toFunctorOps(Channel$.MODULE$.create(concurrentEffect), concurrentEffect).map(channel -> {
                                    if (channel == null) {
                                        throw new MatchError(channel);
                                    }
                                    Object watch = channel.watch();
                                    Object close = channel.close();
                                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new Kafka.AssignedPartition(topicPartition.partition(), i, watch, close)), channel.call());
                                });
                            }, concurrentEffect), concurrentEffect).map(list -> {
                                return new Tuple2((List) list.map(tuple23 -> {
                                    return (Kafka.AssignedPartition) tuple23._1();
                                }, List$.MODULE$.canBuildFrom()), map.$plus$plus((GenTraversableOnce) list.map(tuple24 -> {
                                    if (tuple24 != null) {
                                        Kafka.AssignedPartition assignedPartition = (Kafka.AssignedPartition) tuple24._1();
                                        Object _2 = tuple24._2();
                                        if (assignedPartition != null) {
                                            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(assignedPartition.partition())), _2);
                                        }
                                    }
                                    throw new MatchError(tuple24);
                                }, List$.MODULE$.canBuildFrom())));
                            });
                        } else {
                            if (!(rebalanceEvent instanceof RebalanceEvents.RebalanceEvent.PartitionsRevoked)) {
                                throw new MatchError(rebalanceEvent);
                            }
                            Set<TopicPartition> partitions = ((RebalanceEvents.RebalanceEvent.PartitionsRevoked) rebalanceEvent).partitions();
                            as = implicits$.MODULE$.toFunctorOps(NestedFoldableOps$.MODULE$.sequence_$extension(implicits$.MODULE$.catsSyntaxNestedFoldable(((List) partitions.toList().map(topicPartition2 -> {
                                return BoxesRunTime.boxToInteger(topicPartition2.partition());
                            }, List$.MODULE$.canBuildFrom())).flatMap(obj -> {
                                return $anonfun$assignPartitions$11(map, BoxesRunTime.unboxToInt(obj));
                            }, List$.MODULE$.canBuildFrom()), implicits$.MODULE$.catsStdInstancesForList()), implicits$.MODULE$.catsStdInstancesForList(), concurrentEffect), concurrentEffect).as(new Tuple2(List$.MODULE$.empty(), map.$minus$minus((GenTraversableOnce) partitions.map(topicPartition3 -> {
                                return BoxesRunTime.boxToInteger(topicPartition3.partition());
                            }, Set$.MODULE$.canBuildFrom()))));
                        }
                        return implicits$.MODULE$.catsSyntaxApply(as, concurrentEffect).$less$times(commit);
                    }
                }
            }
            throw new MatchError(tuple2);
        });
    }

    public static final /* synthetic */ FreeC $anonfun$assignPartitions$13(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        return Stream$.MODULE$.emits((List) tuple2._1());
    }

    public static final /* synthetic */ FreeC $anonfun$assignPartitions$1(String str, ConcurrentEffect concurrentEffect, FiniteDuration finiteDuration, FiniteDuration finiteDuration2, Timer timer, KafkaConsumer kafkaConsumer) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(implicits$.MODULE$.toFunctorOps(kafkaConsumer.partitionsFor(str), concurrentEffect).map(set -> {
            return BoxesRunTime.boxToInteger(set.size());
        })), obj -> {
            return new Stream($anonfun$assignPartitions$3(kafkaConsumer, str, finiteDuration, finiteDuration2, concurrentEffect, timer, BoxesRunTime.unboxToInt(obj)));
        }), tuple2 -> {
            return new Stream($anonfun$assignPartitions$13(tuple2));
        });
    }

    private Kafka$() {
        MODULE$ = this;
    }
}
