package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.mutiny.redis.client.Command;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.RedisConnection;
import io.vertx.mutiny.redis.client.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;

/* loaded from: input_file:io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.class */
public class ReactivePubSubCommandsImpl<V> extends AbstractRedisCommands implements ReactivePubSubCommands<V> {
    private final Class<V> classOfMessage;
    private final Redis client;
    private final ReactiveRedisDataSourceImpl datasource;

    /* loaded from: input_file:io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl$AbstractRedisSubscriber.class */
    private abstract class AbstractRedisSubscriber implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        final RedisConnection connection;
        final RedisAPI api;
        final String id = UUID.randomUUID().toString();
        final Consumer<V> onMessage;

        private AbstractRedisSubscriber(RedisConnection redisConnection, RedisAPI redisAPI, Consumer<V> consumer) {
            this.connection = redisConnection;
            this.api = redisAPI;
            this.onMessage = consumer;
        }

        abstract Uni<Void> subscribeToRedis();

        public Uni<String> subscribe() {
            Uni emitter = Uni.createFrom().emitter(uniEmitter -> {
                this.connection.handler(response -> {
                    if (response == null || response.size() <= 0) {
                        return;
                    }
                    if (Vertx.currentContext() != null) {
                        handleRedisEvent(uniEmitter, response);
                    } else {
                        ReactivePubSubCommandsImpl.this.datasource.getVertx().runOnContext(() -> {
                            handleRedisEvent(uniEmitter, response);
                        });
                    }
                });
            });
            return subscribeToRedis().chain(() -> {
                return emitter;
            }).replaceWith(this.id);
        }

        private void handleRedisEvent(UniEmitter<? super Void> uniEmitter, Response response) {
            Context orCreateDuplicatedContext = VertxContext.getOrCreateDuplicatedContext(Vertx.currentContext());
            String response2 = response.get(0).toString();
            if ("subscribe".equalsIgnoreCase(response2) || "psubscribe".equalsIgnoreCase(response2)) {
                uniEmitter.complete((Object) null);
            } else if ("message".equalsIgnoreCase(response2)) {
                orCreateDuplicatedContext.runOnContext(r8 -> {
                    this.onMessage.accept(ReactivePubSubCommandsImpl.this.marshaller.decode(ReactivePubSubCommandsImpl.this.classOfMessage, response.get(2)));
                });
            } else if ("pmessage".equalsIgnoreCase(response2)) {
                orCreateDuplicatedContext.runOnContext(r82 -> {
                    this.onMessage.accept(ReactivePubSubCommandsImpl.this.marshaller.decode(ReactivePubSubCommandsImpl.this.classOfMessage, response.get(3)));
                });
            }
        }

        public Uni<Void> closeAndUnregister(Collection<?> collection) {
            return collection.isEmpty() ? this.connection.close() : Uni.createFrom().voidItem();
        }
    }

    /* loaded from: input_file:io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl$ReactiveAbstractRedisSubscriberImpl.class */
    private class ReactiveAbstractRedisSubscriberImpl extends ReactivePubSubCommandsImpl<V>.AbstractRedisSubscriber implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        private final List<String> channels;

        public ReactiveAbstractRedisSubscriberImpl(RedisConnection redisConnection, RedisAPI redisAPI, Consumer<V> consumer, List<String> list) {
            super(redisConnection, redisAPI, consumer);
            this.channels = new ArrayList(list);
        }

        @Override // io.quarkus.redis.runtime.datasource.ReactivePubSubCommandsImpl.AbstractRedisSubscriber
        Uni<Void> subscribeToRedis() {
            return this.api.subscribe(this.channels).replaceWithVoid();
        }

        @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands.ReactiveRedisSubscriber
        public Uni<Void> unsubscribe(String... strArr) {
            Validation.notNullOrEmpty(strArr, "channels");
            ParameterValidation.doesNotContainNull(strArr, "channels");
            List of = List.of((Object[]) strArr);
            return this.api.unsubscribe(of).chain(() -> {
                this.channels.removeAll(of);
                return closeAndUnregister(this.channels);
            });
        }

        @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands.ReactiveRedisSubscriber
        public Uni<Void> unsubscribe() {
            return this.api.unsubscribe(this.channels).chain(() -> {
                this.channels.clear();
                return closeAndUnregister(this.channels);
            });
        }
    }

    /* loaded from: input_file:io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl$ReactiveRedisPatternSubscriberImpl.class */
    private class ReactiveRedisPatternSubscriberImpl extends ReactivePubSubCommandsImpl<V>.AbstractRedisSubscriber implements ReactivePubSubCommands.ReactiveRedisSubscriber {
        private final List<String> patterns;

        public ReactiveRedisPatternSubscriberImpl(RedisConnection redisConnection, RedisAPI redisAPI, Consumer<V> consumer, List<String> list) {
            super(redisConnection, redisAPI, consumer);
            this.patterns = new ArrayList(list);
        }

        @Override // io.quarkus.redis.runtime.datasource.ReactivePubSubCommandsImpl.AbstractRedisSubscriber
        Uni<Void> subscribeToRedis() {
            return this.api.psubscribe(this.patterns).replaceWithVoid();
        }

        @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands.ReactiveRedisSubscriber
        public Uni<Void> unsubscribe(String... strArr) {
            Validation.notNullOrEmpty(strArr, "patterns");
            ParameterValidation.doesNotContainNull(strArr, "patterns");
            List of = List.of((Object[]) strArr);
            return this.api.punsubscribe(of).chain(() -> {
                this.patterns.removeAll(of);
                return closeAndUnregister(this.patterns);
            });
        }

        @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands.ReactiveRedisSubscriber
        public Uni<Void> unsubscribe() {
            return this.api.punsubscribe(this.patterns).chain(() -> {
                this.patterns.clear();
                return closeAndUnregister(this.patterns);
            });
        }
    }

    public ReactivePubSubCommandsImpl(ReactiveRedisDataSourceImpl reactiveRedisDataSourceImpl, Class<V> cls) {
        super(reactiveRedisDataSourceImpl, new Marshaller(cls));
        this.client = reactiveRedisDataSourceImpl.redis;
        this.datasource = reactiveRedisDataSourceImpl;
        this.classOfMessage = cls;
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisCommands
    public ReactiveRedisDataSource getDataSource() {
        return this.datasource;
    }

    @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
    public Uni<Void> publish(String str, V v) {
        ParameterValidation.nonNull(str, "channel");
        ParameterValidation.nonNull(v, "message");
        return execute(RedisCommand.of(Command.PUBLISH).put(str).put(this.marshaller.encode(v))).replaceWithVoid();
    }

    @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(String str, Consumer<V> consumer) {
        return subscribe(List.of(str), consumer);
    }

    @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPattern(String str, Consumer<V> consumer) {
        return subscribeToPatterns(List.of(str), consumer);
    }

    @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribeToPatterns(List<String> list, Consumer<V> consumer) {
        Validation.notNullOrEmpty(list, "patterns");
        ParameterValidation.nonNull(consumer, "onMessage");
        for (String str : list) {
            if (str == null) {
                throw new IllegalArgumentException("Patterns must not be null");
            }
            if (str.isBlank()) {
                throw new IllegalArgumentException("Patterns cannot be blank");
            }
        }
        return this.client.connect().chain(redisConnection -> {
            ReactiveRedisPatternSubscriberImpl reactiveRedisPatternSubscriberImpl = new ReactiveRedisPatternSubscriberImpl(redisConnection, RedisAPI.api(redisConnection), consumer, list);
            return reactiveRedisPatternSubscriberImpl.subscribe().replaceWith(reactiveRedisPatternSubscriberImpl);
        });
    }

    @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
    public Uni<ReactivePubSubCommands.ReactiveRedisSubscriber> subscribe(List<String> list, Consumer<V> consumer) {
        Validation.notNullOrEmpty(list, "channels");
        ParameterValidation.nonNull(consumer, "onMessage");
        for (String str : list) {
            if (str == null) {
                throw new IllegalArgumentException("Channels must not be null");
            }
            if (str.isBlank()) {
                throw new IllegalArgumentException("Channels cannot be blank");
            }
        }
        return this.client.connect().chain(redisConnection -> {
            ReactiveAbstractRedisSubscriberImpl reactiveAbstractRedisSubscriberImpl = new ReactiveAbstractRedisSubscriberImpl(redisConnection, RedisAPI.api(redisConnection), consumer, list);
            return reactiveAbstractRedisSubscriberImpl.subscribe().replaceWith(reactiveAbstractRedisSubscriberImpl);
        });
    }

    @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
    public Multi<V> subscribe(String... strArr) {
        Validation.notNullOrEmpty(strArr, "channels");
        ParameterValidation.doesNotContainNull(strArr, "channels");
        return Multi.createFrom().emitter(multiEmitter -> {
            List<String> of = List.of((Object[]) strArr);
            Objects.requireNonNull(multiEmitter);
            UniSubscribe subscribe = subscribe(of, multiEmitter::emit).subscribe();
            Consumer consumer = reactiveRedisSubscriber -> {
                multiEmitter.onTermination(() -> {
                    reactiveRedisSubscriber.unsubscribe(strArr).subscribe().asCompletionStage();
                });
            };
            Objects.requireNonNull(multiEmitter);
            subscribe.with(consumer, multiEmitter::fail);
        });
    }

    @Override // io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands
    public Multi<V> subscribeToPatterns(String... strArr) {
        Validation.notNullOrEmpty(strArr, "patterns");
        ParameterValidation.doesNotContainNull(strArr, "patterns");
        return Multi.createFrom().emitter(multiEmitter -> {
            List<String> of = List.of((Object[]) strArr);
            Objects.requireNonNull(multiEmitter);
            UniSubscribe subscribe = subscribeToPatterns(of, multiEmitter::emit).subscribe();
            Consumer consumer = reactiveRedisSubscriber -> {
                multiEmitter.onTermination(() -> {
                    reactiveRedisSubscriber.unsubscribe(strArr).subscribe().asCompletionStage();
                });
            };
            Objects.requireNonNull(multiEmitter);
            subscribe.with(consumer, multiEmitter::fail);
        });
    }
}
