/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.pubsub;

import io.lettuce.core.RedisReactiveCommandsImpl;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.PubSubCommandBuilder;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.ChannelMessage;
import io.lettuce.core.pubsub.api.reactive.PatternMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.util.Map;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public class RedisPubSubReactiveCommandsImpl<K, V>
extends RedisReactiveCommandsImpl<K, V>
implements RedisPubSubReactiveCommands<K, V> {
    private final PubSubCommandBuilder<K, V> commandBuilder;

    public RedisPubSubReactiveCommandsImpl(StatefulRedisPubSubConnection<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
        this.commandBuilder = new PubSubCommandBuilder<K, V>(codec);
    }

    @Override
    public Flux<PatternMessage<K, V>> observePatterns() {
        return this.observePatterns(FluxSink.OverflowStrategy.BUFFER);
    }

    @Override
    public Flux<PatternMessage<K, V>> observePatterns(FluxSink.OverflowStrategy overflowStrategy) {
        return Flux.create(sink -> {
            RedisPubSubAdapter listener = new RedisPubSubAdapter<K, V>((FluxSink)sink){
                final /* synthetic */ FluxSink val$sink;
                {
                    this.val$sink = fluxSink;
                }

                @Override
                public void message(K pattern, K channel, V message) {
                    this.val$sink.next(new PatternMessage(pattern, channel, message));
                }
            };
            StatefulRedisConnection statefulConnection = this.getStatefulConnection();
            statefulConnection.addListener(listener);
            sink.onDispose(() -> RedisPubSubReactiveCommandsImpl.lambda$null$0((StatefulRedisPubSubConnection)statefulConnection, listener));
        }, overflowStrategy);
    }

    @Override
    public Flux<ChannelMessage<K, V>> observeChannels() {
        return this.observeChannels(FluxSink.OverflowStrategy.BUFFER);
    }

    @Override
    public Flux<ChannelMessage<K, V>> observeChannels(FluxSink.OverflowStrategy overflowStrategy) {
        return Flux.create(sink -> {
            RedisPubSubAdapter listener = new RedisPubSubAdapter<K, V>((FluxSink)sink){
                final /* synthetic */ FluxSink val$sink;
                {
                    this.val$sink = fluxSink;
                }

                @Override
                public void message(K channel, V message) {
                    this.val$sink.next(new ChannelMessage(channel, message));
                }
            };
            StatefulRedisConnection statefulConnection = this.getStatefulConnection();
            statefulConnection.addListener(listener);
            sink.onDispose(() -> RedisPubSubReactiveCommandsImpl.lambda$null$2((StatefulRedisPubSubConnection)statefulConnection, listener));
        }, overflowStrategy);
    }

    @Override
    public Mono<Void> psubscribe(K ... patterns) {
        return this.createMono(() -> this.commandBuilder.psubscribe(patterns)).then();
    }

    @Override
    public Mono<Void> punsubscribe(K ... patterns) {
        return this.createFlux(() -> this.commandBuilder.punsubscribe(patterns)).then();
    }

    @Override
    public Mono<Void> subscribe(K ... channels) {
        return this.createFlux(() -> this.commandBuilder.subscribe(channels)).then();
    }

    @Override
    public Mono<Void> unsubscribe(K ... channels) {
        return this.createFlux(() -> this.commandBuilder.unsubscribe(channels)).then();
    }

    @Override
    public Mono<Long> publish(K channel, V message) {
        return this.createMono(() -> this.commandBuilder.publish(channel, message));
    }

    @Override
    public Flux<K> pubsubChannels(K channel) {
        return this.createDissolvingFlux(() -> this.commandBuilder.pubsubChannels(channel));
    }

    @Override
    public Mono<Map<K, Long>> pubsubNumsub(K ... channels) {
        return this.createMono(() -> this.commandBuilder.pubsubNumsub(channels));
    }

    @Override
    public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
        return (StatefulRedisPubSubConnection)super.getStatefulConnection();
    }

    private static /* synthetic */ void lambda$null$2(StatefulRedisPubSubConnection statefulConnection, RedisPubSubAdapter listener) {
        statefulConnection.removeListener(listener);
    }

    private static /* synthetic */ void lambda$null$0(StatefulRedisPubSubConnection statefulConnection, RedisPubSubAdapter listener) {
        statefulConnection.removeListener(listener);
    }
}

