package ru.tinkoff.kora.database.vertx;

import io.netty.channel.EventLoopGroup;
import io.vertx.core.Future;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Transaction;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.application.graph.Wrapped;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetryFactory;
import ru.tinkoff.kora.vertx.common.VertxUtil;

/* loaded from: input_file:ru/tinkoff/kora/database/vertx/VertxDatabase.class */
public class VertxDatabase implements Lifecycle, Wrapped<Pool>, VertxConnectionFactory {
    private final Context.Key<SqlConnection> connectionKey = new Context.Key<SqlConnection>() { // from class: ru.tinkoff.kora.database.vertx.VertxDatabase.1
        /* JADX INFO: Access modifiers changed from: protected */
        public SqlConnection copy(SqlConnection sqlConnection) {
            return null;
        }
    };
    private final Context.Key<Transaction> transactionKey = new Context.Key<Transaction>() { // from class: ru.tinkoff.kora.database.vertx.VertxDatabase.2
        /* JADX INFO: Access modifiers changed from: protected */
        public Transaction copy(Transaction transaction) {
            return null;
        }
    };
    private final Pool pool;
    private final DataBaseTelemetry telemetry;

    public VertxDatabase(VertxDatabaseConfig vertxDatabaseConfig, EventLoopGroup eventLoopGroup, DataBaseTelemetryFactory dataBaseTelemetryFactory) {
        this.telemetry = dataBaseTelemetryFactory.get(vertxDatabaseConfig.poolName(), "postgres", vertxDatabaseConfig.username());
        this.pool = PgPool.pool(VertxUtil.customEventLoopVertx(eventLoopGroup), vertxDatabaseConfig.toPgConnectOptions(), vertxDatabaseConfig.toPgPoolOptions());
    }

    @Override // ru.tinkoff.kora.database.vertx.VertxConnectionFactory
    public SqlConnection currentConnection() {
        return (SqlConnection) Context.current().get(this.connectionKey);
    }

    @Override // ru.tinkoff.kora.database.vertx.VertxConnectionFactory
    public CompletionStage<SqlConnection> newConnection() {
        return this.pool.getConnection().toCompletionStage();
    }

    @Override // ru.tinkoff.kora.database.vertx.VertxConnectionFactory
    public Pool pool() {
        return this.pool;
    }

    @Override // ru.tinkoff.kora.database.vertx.VertxConnectionFactory
    public DataBaseTelemetry telemetry() {
        return this.telemetry;
    }

    @Override // ru.tinkoff.kora.database.vertx.VertxConnectionFactory
    public <T> Mono<T> withConnection(Function<SqlConnection, Mono<T>> function) {
        return Mono.deferContextual(contextView -> {
            Context current = Context.Reactor.current(contextView);
            SqlConnection sqlConnection = (SqlConnection) current.get(this.connectionKey);
            return sqlConnection != null ? (Mono) function.apply(sqlConnection) : Mono.create(monoSink -> {
                this.pool.withConnection(sqlConnection2 -> {
                    current.set(this.connectionKey, sqlConnection2);
                    AtomicBoolean atomicBoolean = new AtomicBoolean();
                    return Future.future(promise -> {
                        current.inject();
                        Mono mono = (Mono) function.apply(sqlConnection2);
                        Consumer consumer = obj -> {
                            if (atomicBoolean.compareAndSet(false, true)) {
                                promise.complete(obj);
                            }
                        };
                        Objects.requireNonNull(promise);
                        mono.subscribe(consumer, promise::fail, () -> {
                            if (atomicBoolean.compareAndSet(false, true)) {
                                promise.complete();
                            }
                        }, Context.Reactor.inject(contextView, current));
                    });
                }, asyncResult -> {
                    if (asyncResult.failed()) {
                        monoSink.error(asyncResult.cause());
                    } else {
                        monoSink.success(asyncResult.result());
                    }
                    current.remove(this.connectionKey);
                });
            }).contextWrite(context -> {
                return Context.Reactor.inject(context, current);
            });
        });
    }

    @Override // ru.tinkoff.kora.database.vertx.VertxConnectionFactory
    public <T> Mono<T> inTx(Function<SqlConnection, Mono<T>> function) {
        return withConnection(sqlConnection -> {
            return Mono.deferContextual(contextView -> {
                Context current = Context.Reactor.current(contextView);
                return ((Transaction) current.get(this.transactionKey)) != null ? (Mono) function.apply(sqlConnection) : Mono.create(monoSink -> {
                    sqlConnection.begin(asyncResult -> {
                        current.inject();
                        if (asyncResult.failed()) {
                            monoSink.error(asyncResult.cause());
                            return;
                        }
                        Transaction transaction = (Transaction) asyncResult.result();
                        current.set(this.transactionKey, transaction);
                        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                        ((Mono) function.apply(sqlConnection)).subscribe(obj -> {
                        }, th -> {
                            if (atomicBoolean.compareAndSet(false, true)) {
                                transaction.rollback(asyncResult -> {
                                    Context current2 = Context.current();
                                    try {
                                        current.inject();
                                        if (asyncResult.failed()) {
                                            th.addSuppressed(asyncResult.cause());
                                        }
                                        monoSink.error(th);
                                        current.remove(this.transactionKey);
                                        current2.inject();
                                    } catch (Throwable th) {
                                        current.remove(this.transactionKey);
                                        current2.inject();
                                        throw th;
                                    }
                                });
                            }
                        }, () -> {
                            if (atomicBoolean.compareAndSet(false, true)) {
                                transaction.commit(asyncResult -> {
                                    Context current2 = Context.current();
                                    try {
                                        current.inject();
                                        if (asyncResult.succeeded()) {
                                            monoSink.success();
                                        } else {
                                            monoSink.error(asyncResult.cause());
                                        }
                                    } finally {
                                        current.remove(this.transactionKey);
                                        current2.inject();
                                    }
                                });
                            } else {
                                current.remove(this.transactionKey);
                            }
                        }, Context.Reactor.inject(contextView, current));
                    });
                });
            });
        });
    }

    public Mono<Void> init() {
        return Mono.create(monoSink -> {
            this.pool.query("SELECT 1").execute(asyncResult -> {
                if (asyncResult.succeeded()) {
                    monoSink.success();
                } else {
                    monoSink.error(asyncResult.cause());
                }
            });
        });
    }

    public Mono<Void> release() {
        return Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                this.pool.close(asyncResult -> {
                    if (asyncResult.succeeded()) {
                        monoSink.success();
                    } else {
                        monoSink.error(asyncResult.cause());
                    }
                });
            });
        });
    }

    /* renamed from: value, reason: merged with bridge method [inline-methods] */
    public Pool m0value() {
        return this.pool;
    }
}
