/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.nio.ByteBuffer;
import java.util.function.Function;
import lombok.NonNull;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactivePubSubCommands;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class LettuceReactivePubSubCommands
implements ReactivePubSubCommands {
    @NonNull
    private final LettuceReactiveRedisConnection connection;

    @Override
    public Mono<ReactiveSubscription> createSubscription() {
        return this.connection.getPubSubConnection().map(pubSubConnection -> new LettuceReactiveSubscription((RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer>)pubSubConnection.reactive(), this.connection.translateException()));
    }

    @Override
    public Flux<Long> publish(Publisher<ReactiveSubscription.ChannelMessage<ByteBuffer, ByteBuffer>> messageStream) {
        Assert.notNull(messageStream, "ChannelMessage stream must not be null!");
        return this.connection.getCommands().flatMapMany(commands -> Flux.from(messageStream).flatMap(message -> commands.publish(message.getChannel(), message.getMessage())));
    }

    @Override
    public Mono<Void> subscribe(ByteBuffer ... channels) {
        Assert.notNull((Object)channels, "Channels must not be null!");
        return this.doWithPubSub(commands -> commands.subscribe(channels));
    }

    @Override
    public Mono<Void> pSubscribe(ByteBuffer ... patterns) {
        Assert.notNull((Object)patterns, "Patterns must not be null!");
        return this.doWithPubSub(commands -> commands.psubscribe(patterns));
    }

    private <T> Mono<T> doWithPubSub(Function<RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer>, Mono<T>> function) {
        return this.connection.getPubSubConnection().flatMap(pubSubConnection -> (Mono)function.apply((RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer>)pubSubConnection.reactive())).onErrorMap(this.connection.translateException());
    }

    public LettuceReactivePubSubCommands(@NonNull LettuceReactiveRedisConnection connection) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        this.connection = connection;
    }
}

