/*
 * Decompiled with CFR 0.152.
 */
package ru.tinkoff.kora.database.r2dbc;

import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.ValidationDepth;
import jakarta.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.common.readiness.ReadinessProbe;
import ru.tinkoff.kora.common.readiness.ReadinessProbeFailure;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetryFactory;
import ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory;
import ru.tinkoff.kora.database.r2dbc.R2dbcDatabaseConfig;

public class R2dbcDatabase
implements R2dbcConnectionFactory,
Lifecycle,
ReadinessProbe {
    private static final Logger logger = LoggerFactory.getLogger(R2dbcDatabase.class);
    private static final Option<Map<String, String>> OPTIONS = Option.valueOf((String)"options");
    private final Context.Key<Connection> connectionKey = new Context.Key<Connection>(){

        protected Connection copy(Connection object) {
            return null;
        }
    };
    private final Context.Key<Connection> transactionKey = new Context.Key<Connection>(){

        protected Connection copy(Connection object) {
            return null;
        }
    };
    private final ConnectionPool connectionFactory;
    private final DataBaseTelemetry telemetry;
    private final R2dbcDatabaseConfig config;

    public R2dbcDatabase(R2dbcDatabaseConfig config, List<Function<ConnectionFactoryOptions.Builder, ConnectionFactoryOptions.Builder>> customizers, DataBaseTelemetryFactory telemetryFactory) {
        this.config = config;
        this.connectionFactory = R2dbcDatabase.r2dbcConnectionFactory(config, customizers);
        this.telemetry = Objects.requireNonNullElse(telemetryFactory.get(config.telemetry(), config.poolName(), "r2dbc", config.r2dbcUrl().substring(5, config.r2dbcUrl().indexOf(":", 6)), config.username()), DataBaseTelemetryFactory.EMPTY);
    }

    @Override
    public Mono<Connection> currentConnection() {
        return Mono.deferContextual(reactorContext -> {
            Context ctx = Context.Reactor.current((ContextView)reactorContext);
            Connection connection = (Connection)ctx.get(this.connectionKey);
            if (connection != null) {
                return Mono.just((Object)connection);
            }
            return this.connectionFactory.create();
        });
    }

    @Override
    public Mono<Connection> newConnection() {
        return this.connectionFactory.create();
    }

    @Override
    public DataBaseTelemetry telemetry() {
        return this.telemetry;
    }

    @Override
    public <T> Mono<T> inTx(Function<Connection, Mono<T>> callback) {
        return Mono.deferContextual(reactorContext -> {
            Context ctx = Context.Reactor.current((ContextView)reactorContext);
            Connection tx = (Connection)ctx.get(this.transactionKey);
            if (tx != null) {
                return (Mono)callback.apply(tx);
            }
            Connection connection2 = (Connection)ctx.get(this.connectionKey);
            if (connection2 != null) {
                return Mono.usingWhen((Publisher)Mono.from((Publisher)connection2.beginTransaction()).thenReturn((Object)connection2), c -> {
                    ctx.set(this.transactionKey, c);
                    return ((Mono)callback.apply((Connection)c)).onErrorResume(e -> Mono.from((Publisher)c.rollbackTransaction()).then(Mono.error((Throwable)e))).flatMap(r -> Mono.from((Publisher)c.commitTransaction()).then(Mono.just((Object)r))).switchIfEmpty(Mono.from((Publisher)c.commitTransaction()).then(Mono.empty()));
                }, c -> Mono.fromRunnable(() -> ctx.remove(this.transactionKey)));
            }
            return this.withConnection(connection -> Mono.usingWhen((Publisher)Mono.from((Publisher)connection.beginTransaction()).thenReturn(connection), c -> {
                ctx.set(this.transactionKey, c);
                return ((Mono)callback.apply((Connection)c)).onErrorResume(e -> Mono.from((Publisher)c.rollbackTransaction()).then(Mono.error((Throwable)e))).flatMap(r -> Mono.from((Publisher)c.commitTransaction()).then(Mono.just((Object)r))).switchIfEmpty(Mono.from((Publisher)c.commitTransaction()).then(Mono.empty()));
            }, c -> Mono.fromRunnable(() -> ctx.remove(this.transactionKey))));
        });
    }

    @Override
    public <T> Mono<T> withConnection(Function<Connection, Mono<T>> callback) {
        return Mono.deferContextual(reactorContext -> {
            Context ctx = Context.Reactor.current((ContextView)reactorContext);
            Connection connection = (Connection)ctx.get(this.connectionKey);
            if (connection != null) {
                return (Mono)callback.apply(connection);
            }
            return Mono.usingWhen((Publisher)this.connectionFactory.create(), c -> {
                ctx.set(this.connectionKey, c);
                return (Mono)callback.apply((Connection)c);
            }, c -> {
                ctx.remove(this.connectionKey);
                return Mono.fromRunnable(() -> Mono.from((Publisher)c.close()).subscribe());
            });
        });
    }

    @Override
    public <T> Flux<T> withConnectionFlux(Function<Connection, Flux<T>> callback) {
        return Flux.deferContextual(reactorContext -> {
            Context ctx = Context.Reactor.current((ContextView)reactorContext);
            Connection connection = (Connection)ctx.get(this.connectionKey);
            if (connection != null) {
                return (Publisher)callback.apply(connection);
            }
            return Flux.usingWhen((Publisher)this.connectionFactory.create(), c -> {
                ctx.set(this.connectionKey, c);
                return (Publisher)callback.apply((Connection)c);
            }, c -> {
                ctx.remove(this.connectionKey);
                return Mono.fromRunnable(() -> Mono.from((Publisher)c.close()).doOnError(e -> logger.warn(e.getMessage())).subscribe());
            });
        });
    }

    private static ConnectionPool r2dbcConnectionFactory(R2dbcDatabaseConfig config, List<Function<ConnectionFactoryOptions.Builder, ConnectionFactoryOptions.Builder>> customizers) {
        ConnectionFactoryOptions.Builder connectionFactoryOptions = ConnectionFactoryOptions.parse((CharSequence)config.r2dbcUrl()).mutate().option(ConnectionFactoryOptions.USER, (Object)config.username()).option(ConnectionFactoryOptions.PASSWORD, (Object)config.password()).option(ConnectionFactoryOptions.CONNECT_TIMEOUT, (Object)config.connectionTimeout());
        if (config.statementTimeout() != null) {
            connectionFactoryOptions.option(ConnectionFactoryOptions.STATEMENT_TIMEOUT, (Object)config.statementTimeout());
        }
        connectionFactoryOptions.option(OPTIONS, config.options());
        for (Function<ConnectionFactoryOptions.Builder, ConnectionFactoryOptions.Builder> customizer : customizers) {
            connectionFactoryOptions = customizer.apply(connectionFactoryOptions);
        }
        ConnectionFactory connectionFactory = ConnectionFactories.get((ConnectionFactoryOptions)connectionFactoryOptions.build());
        return new ConnectionPool(ConnectionPoolConfiguration.builder().name(config.poolName()).maxLifeTime(config.maxLifetime()).maxIdleTime(config.idleTimeout()).maxAcquireTime(config.connectionTimeout()).maxCreateConnectionTime(config.connectionCreateTimeout()).maxSize(config.maxPoolSize()).acquireRetry(config.acquireRetry()).validationQuery("SELECT 1").validationDepth(ValidationDepth.REMOTE).connectionFactory(connectionFactory).build());
    }

    public void init() {
        try {
            this.connectionFactory.warmup().block();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void release() {
        this.connectionFactory.dispose();
    }

    @Nullable
    public ReadinessProbeFailure probe() {
        Boolean isValid;
        if (this.config.readinessProbe() && Boolean.FALSE.equals(isValid = (Boolean)Mono.usingWhen((Publisher)this.connectionFactory.create(), c -> Mono.from((Publisher)c.validate(ValidationDepth.REMOTE)), Connection::close).block())) {
            return new ReadinessProbeFailure("Connection is not valid");
        }
        return null;
    }
}

