package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.bitmap.ReactiveBitMapCommands;
import io.quarkus.redis.datasource.geo.ReactiveGeoCommands;
import io.quarkus.redis.datasource.hash.ReactiveHashCommands;
import io.quarkus.redis.datasource.hyperloglog.ReactiveHyperLogLogCommands;
import io.quarkus.redis.datasource.keys.ReactiveKeyCommands;
import io.quarkus.redis.datasource.list.ReactiveListCommands;
import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands;
import io.quarkus.redis.datasource.set.ReactiveSetCommands;
import io.quarkus.redis.datasource.sortedset.ReactiveSortedSetCommands;
import io.quarkus.redis.datasource.string.ReactiveStringCommands;
import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult;
import io.quarkus.redis.datasource.transactions.ReactiveTransactionalRedisDataSource;
import io.quarkus.redis.datasource.transactions.TransactionResult;
import io.quarkus.redis.runtime.client.config.RedisConfig;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnTerminate;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Command;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.RedisConnection;
import io.vertx.mutiny.redis.client.Request;
import io.vertx.mutiny.redis.client.Response;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;

/* loaded from: input_file:io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.class */
public class ReactiveRedisDataSourceImpl implements ReactiveRedisDataSource, RedisCommandExecutor {
    final Redis redis;
    final RedisConnection connection;
    private final Vertx vertx;

    public ReactiveRedisDataSourceImpl(Vertx vertx, Redis redis, RedisAPI redisAPI) {
        ParameterValidation.nonNull(redis, RedisConfig.REDIS_CONFIG_ROOT_NAME);
        ParameterValidation.nonNull(redisAPI, "api");
        ParameterValidation.nonNull(vertx, "vertx");
        this.vertx = vertx;
        this.redis = redis;
        this.connection = null;
    }

    public ReactiveRedisDataSourceImpl(Vertx vertx, Redis redis, RedisConnection redisConnection) {
        ParameterValidation.nonNull(redis, RedisConfig.REDIS_CONFIG_ROOT_NAME);
        ParameterValidation.nonNull(redisConnection, "connection");
        ParameterValidation.nonNull(vertx, "vertx");
        this.vertx = vertx;
        this.redis = redis;
        this.connection = redisConnection;
    }

    @Override // io.quarkus.redis.runtime.datasource.RedisCommandExecutor
    public Uni<Response> execute(Request request) {
        return this.connection != null ? this.connection.send(request) : this.redis.send(request);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Uni<TransactionResult> withTransaction(Function<ReactiveTransactionalRedisDataSource, Uni<Void>> function) {
        ParameterValidation.nonNull(function, "function");
        return this.redis.connect().onItem().transformToUni(redisConnection -> {
            ReactiveRedisDataSourceImpl reactiveRedisDataSourceImpl = new ReactiveRedisDataSourceImpl(this.vertx, this.redis, redisConnection);
            TransactionHolder transactionHolder = new TransactionHolder();
            UniOnTerminate onTermination = redisConnection.send(Request.cmd(Command.MULTI)).chain(response -> {
                return (Uni) function.apply(new ReactiveTransactionalRedisDataSourceImpl(reactiveRedisDataSourceImpl, transactionHolder));
            }).chain(r5 -> {
                return !transactionHolder.discarded() ? redisConnection.send(Request.cmd(Command.EXEC)) : Uni.createFrom().nullItem();
            }).onTermination();
            Objects.requireNonNull(redisConnection);
            return onTermination.call(redisConnection::close).map(response2 -> {
                return toTransactionResult(response2, transactionHolder);
            });
        });
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Uni<TransactionResult> withTransaction(Function<ReactiveTransactionalRedisDataSource, Uni<Void>> function, String... strArr) {
        ParameterValidation.nonNull(function, "function");
        Validation.notNullOrEmpty(strArr, "keys");
        ParameterValidation.doesNotContainNull(strArr, "keys");
        return this.redis.connect().onItem().transformToUni(redisConnection -> {
            ReactiveRedisDataSourceImpl reactiveRedisDataSourceImpl = new ReactiveRedisDataSourceImpl(this.vertx, this.redis, redisConnection);
            List.of((Object[]) strArr);
            TransactionHolder transactionHolder = new TransactionHolder();
            return watch(redisConnection, strArr).chain(() -> {
                UniOnTerminate onTermination = redisConnection.send(Request.cmd(Command.MULTI)).chain(response -> {
                    return (Uni) function.apply(new ReactiveTransactionalRedisDataSourceImpl(reactiveRedisDataSourceImpl, transactionHolder));
                }).onItemOrFailure().transformToUni((r5, th) -> {
                    return (transactionHolder.discarded() || th != null) ? !transactionHolder.discarded() ? redisConnection.send(Request.cmd(Command.DISCARD)) : Uni.createFrom().nullItem() : redisConnection.send(Request.cmd(Command.EXEC));
                }).onTermination();
                Objects.requireNonNull(redisConnection);
                return onTermination.call(redisConnection::close).map(response2 -> {
                    return toTransactionResult(response2, transactionHolder);
                });
            });
        });
    }

    private Uni<Void> watch(RedisConnection redisConnection, String... strArr) {
        List of = List.of((Object[]) strArr);
        Request cmd = Request.cmd(Command.WATCH);
        Iterator it = of.iterator();
        while (it.hasNext()) {
            cmd.arg((String) it.next());
        }
        return redisConnection.send(cmd).replaceWithVoid();
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <I> Uni<OptimisticLockingTransactionResult<I>> withTransaction(Function<ReactiveRedisDataSource, Uni<I>> function, BiFunction<I, ReactiveTransactionalRedisDataSource, Uni<Void>> biFunction, String... strArr) {
        ParameterValidation.nonNull(biFunction, "tx");
        Validation.notNullOrEmpty(strArr, "watchedKeys");
        ParameterValidation.doesNotContainNull(strArr, "watchedKeys");
        ParameterValidation.nonNull(function, "preTxBlock");
        return this.redis.connect().onItem().transformToUni(redisConnection -> {
            ReactiveRedisDataSourceImpl reactiveRedisDataSourceImpl = new ReactiveRedisDataSourceImpl(this.vertx, this.redis, redisConnection);
            TransactionHolder transactionHolder = new TransactionHolder();
            return watch(redisConnection, strArr).chain(r10 -> {
                return (Uni) function.apply(new ReactiveRedisDataSourceImpl(this.vertx, this.redis, redisConnection));
            }).chain(obj -> {
                UniOnTerminate onTermination = redisConnection.send(Request.cmd(Command.MULTI)).chain(response -> {
                    return (Uni) biFunction.apply(obj, new ReactiveTransactionalRedisDataSourceImpl(reactiveRedisDataSourceImpl, transactionHolder));
                }).onItemOrFailure().transformToUni((r5, th) -> {
                    return (transactionHolder.discarded() || th != null) ? !transactionHolder.discarded() ? redisConnection.send(Request.cmd(Command.DISCARD)).replaceWithNull() : Uni.createFrom().nullItem() : redisConnection.send(Request.cmd(Command.EXEC));
                }).onTermination();
                Objects.requireNonNull(redisConnection);
                return onTermination.call(redisConnection::close).map(response2 -> {
                    return toTransactionResult(response2, obj, transactionHolder);
                });
            });
        });
    }

    public static TransactionResult toTransactionResult(Response response, TransactionHolder transactionHolder) {
        return response == null ? TransactionResultImpl.DISCARDED : new TransactionResultImpl(transactionHolder.discarded(), transactionHolder.map(response));
    }

    public static <I> OptimisticLockingTransactionResult<I> toTransactionResult(Response response, I i, TransactionHolder transactionHolder) {
        return response == null ? OptimisticLockingTransactionResultImpl.discarded(i) : new OptimisticLockingTransactionResultImpl(transactionHolder.discarded(), i, transactionHolder.map(response));
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Uni<Response> execute(String str, String... strArr) {
        ParameterValidation.nonNull(str, "command");
        return execute(Command.create(str), strArr);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Uni<Response> execute(Command command, String... strArr) {
        ParameterValidation.nonNull(command, "command");
        Request cmd = Request.cmd(command);
        for (String str : strArr) {
            cmd.arg(str);
        }
        return execute(cmd);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Uni<Response> execute(io.vertx.redis.client.Command command, String... strArr) {
        ParameterValidation.nonNull(command, "command");
        Request newInstance = Request.newInstance(io.vertx.redis.client.Request.cmd(command));
        for (String str : strArr) {
            newInstance.arg(str);
        }
        return execute(newInstance);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Uni<Void> withConnection(Function<ReactiveRedisDataSource, Uni<Void>> function) {
        return this.connection != null ? function.apply(this) : this.redis.connect().onItem().transformToUni(redisConnection -> {
            UniOnTerminate onTermination = ((Uni) function.apply(new ReactiveRedisDataSourceImpl(this.vertx, this.redis, redisConnection))).onTermination();
            Objects.requireNonNull(redisConnection);
            return onTermination.call(redisConnection::close);
        });
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Uni<Void> select(long j) {
        ParameterValidation.positiveOrZero(j, "index");
        return execute(Request.cmd(Command.SELECT).arg(j)).replaceWithVoid();
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Uni<Void> flushall() {
        return execute(Request.cmd(Command.FLUSHALL)).replaceWithVoid();
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K, F, V> ReactiveHashCommands<K, F, V> hash(Class<K> cls, Class<F> cls2, Class<V> cls3) {
        return new ReactiveHashCommandsImpl(this, cls, cls2, cls3);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K, V> ReactiveGeoCommands<K, V> geo(Class<K> cls, Class<V> cls2) {
        return new ReactiveGeoCommandsImpl(this, cls, cls2);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K> ReactiveKeyCommands<K> key(Class<K> cls) {
        return new ReactiveKeyCommandsImpl(this, cls);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K, V> ReactiveSortedSetCommands<K, V> sortedSet(Class<K> cls, Class<V> cls2) {
        return new ReactiveSortedSetCommandsImpl(this, cls, cls2);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K, V> ReactiveStringCommands<K, V> string(Class<K> cls, Class<V> cls2) {
        return new ReactiveStringCommandsImpl(this, cls, cls2);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K, V> ReactiveSetCommands<K, V> set(Class<K> cls, Class<V> cls2) {
        return new ReactiveSetCommandsImpl(this, cls, cls2);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K, V> ReactiveListCommands<K, V> list(Class<K> cls, Class<V> cls2) {
        return new ReactiveListCommandsImpl(this, cls, cls2);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K, V> ReactiveHyperLogLogCommands<K, V> hyperloglog(Class<K> cls, Class<V> cls2) {
        return new ReactiveHyperLogLogCommandsImpl(this, cls, cls2);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <K> ReactiveBitMapCommands<K> bitmap(Class<K> cls) {
        return new ReactiveBitMapCommandsImpl(this, cls);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public <V> ReactivePubSubCommands<V> pubsub(Class<V> cls) {
        return new ReactivePubSubCommandsImpl(this, cls);
    }

    @Override // io.quarkus.redis.datasource.ReactiveRedisDataSource
    public Redis getRedis() {
        return this.redis;
    }

    public Vertx getVertx() {
        return this.vertx;
    }
}
