package com.github.gvolpe.fs2redis.interpreter.pubsub.internals;

import cats.effect.ConcurrentEffect;
import cats.effect.concurrent.Ref;
import cats.effect.syntax.package$effect$;
import cats.syntax.package$all$;
import com.github.gvolpe.fs2redis.domain;
import com.github.gvolpe.fs2redis.effect.Log;
import fs2.concurrent.Topic;
import fs2.concurrent.Topic$;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;

/* compiled from: Fs2PubSubInternals.scala */
/* loaded from: input_file:com/github/gvolpe/fs2redis/interpreter/pubsub/internals/Fs2PubSubInternals$.class */
public final class Fs2PubSubInternals$ {
    public static Fs2PubSubInternals$ MODULE$;

    static {
        new Fs2PubSubInternals$();
    }

    public <F, K, V> RedisPubSubListener<K, V> defaultListener(final domain.Fs2RedisChannel<K> fs2RedisChannel, final Topic<F, Option<V>> topic, final ConcurrentEffect<F> concurrentEffect) {
        return new RedisPubSubListener<K, V>(fs2RedisChannel, topic, concurrentEffect) { // from class: com.github.gvolpe.fs2redis.interpreter.pubsub.internals.Fs2PubSubInternals$$anon$1
            private final domain.Fs2RedisChannel fs2RedisChannel$1;
            private final Topic topic$1;
            private final ConcurrentEffect evidence$1$1;

            public void message(K k, V v) {
                if (BoxesRunTime.equals(k, this.fs2RedisChannel$1.value())) {
                    package$effect$.MODULE$.toEffectOps(this.topic$1.publish1(Option$.MODULE$.apply(v)), this.evidence$1$1).toIO().unsafeRunAsync(either -> {
                        $anonfun$message$1(either);
                        return BoxedUnit.UNIT;
                    });
                }
            }

            public void message(K k, K k2, V v) {
                message(k2, v);
            }

            public void psubscribed(K k, long j) {
            }

            public void subscribed(K k, long j) {
            }

            public void unsubscribed(K k, long j) {
            }

            public void punsubscribed(K k, long j) {
            }

            public static final /* synthetic */ void $anonfun$message$1(Either either) {
            }

            {
                this.fs2RedisChannel$1 = fs2RedisChannel;
                this.topic$1 = topic;
                this.evidence$1$1 = concurrentEffect;
            }
        };
    }

    public <F, K, V> Function1<domain.Fs2RedisChannel<K>, Function1<Map<K, Topic<F, Option<V>>>, F>> apply(Ref<F, Map<K, Topic<F, Option<V>>>> ref, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, ConcurrentEffect<F> concurrentEffect, Log<F> log) {
        return fs2RedisChannel -> {
            return map -> {
                return map.get(fs2RedisChannel.value()).fold(() -> {
                    return package$all$.MODULE$.toFlatMapOps(Topic$.MODULE$.apply(None$.MODULE$, concurrentEffect), concurrentEffect).flatMap(topic -> {
                        RedisPubSubListener defaultListener = MODULE$.defaultListener(fs2RedisChannel, topic, concurrentEffect);
                        return package$all$.MODULE$.toFlatMapOps(log.info(new StringBuilder(31).append("Creating listener for channel: ").append(fs2RedisChannel).toString()), concurrentEffect).flatMap(boxedUnit -> {
                            return package$all$.MODULE$.toFlatMapOps(concurrentEffect.delay(() -> {
                                statefulRedisPubSubConnection.addListener(defaultListener);
                            }), concurrentEffect).flatMap(boxedUnit -> {
                                return package$all$.MODULE$.toFunctorOps(ref.update(map -> {
                                    return map.updated(fs2RedisChannel.value(), topic);
                                }), concurrentEffect).map(boxedUnit -> {
                                    return topic;
                                });
                            });
                        });
                    });
                }, topic -> {
                    return concurrentEffect.pure(topic);
                });
            };
        };
    }

    private Fs2PubSubInternals$() {
        MODULE$ = this;
    }
}
