package ru.tinkoff.kora.database.r2dbc;

import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ValidationDepth;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import ru.tinkoff.kora.application.graph.Lifecycle;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetryFactory;

/* loaded from: input_file:ru/tinkoff/kora/database/r2dbc/R2dbcDatabase.class */
public class R2dbcDatabase implements R2dbcConnectionFactory, Lifecycle {
    private final Context.Key<Connection> connectionKey = new Context.Key<Connection>() { // from class: ru.tinkoff.kora.database.r2dbc.R2dbcDatabase.1
        /* JADX INFO: Access modifiers changed from: protected */
        public Connection copy(Connection connection) {
            return null;
        }
    };
    private final Context.Key<Connection> transactionKey = new Context.Key<Connection>() { // from class: ru.tinkoff.kora.database.r2dbc.R2dbcDatabase.2
        /* JADX INFO: Access modifiers changed from: protected */
        public Connection copy(Connection connection) {
            return null;
        }
    };
    private final ConnectionPool connectionFactory;
    private final DataBaseTelemetry telemetry;

    public R2dbcDatabase(R2dbcDatabaseConfig r2dbcDatabaseConfig, List<Function<ConnectionFactoryOptions.Builder, ConnectionFactoryOptions.Builder>> list, DataBaseTelemetryFactory dataBaseTelemetryFactory) {
        this.connectionFactory = r2dbcConnectionFactory(r2dbcDatabaseConfig, list);
        this.telemetry = dataBaseTelemetryFactory.get(r2dbcDatabaseConfig.poolName(), r2dbcDatabaseConfig.url().substring(5, r2dbcDatabaseConfig.url().indexOf(":", 6)), r2dbcDatabaseConfig.username());
    }

    @Override // ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory
    public Mono<Connection> currentConnection() {
        return Mono.deferContextual(contextView -> {
            Connection connection = (Connection) Context.Reactor.current(contextView).get(this.connectionKey);
            return connection != null ? Mono.just(connection) : this.connectionFactory.create();
        });
    }

    @Override // ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory
    public Mono<Connection> newConnection() {
        return this.connectionFactory.create();
    }

    @Override // ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory
    public DataBaseTelemetry telemetry() {
        return this.telemetry;
    }

    @Override // ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory
    public <T> Mono<T> inTx(Function<Connection, Mono<T>> function) {
        return Mono.deferContextual(contextView -> {
            Context current = Context.Reactor.current(contextView);
            Connection connection = (Connection) current.get(this.transactionKey);
            if (connection != null) {
                return (Mono) function.apply(connection);
            }
            Connection connection2 = (Connection) current.get(this.connectionKey);
            return connection2 != null ? Mono.usingWhen(Mono.from(connection2.beginTransaction()).thenReturn(connection2), connection3 -> {
                current.set(this.transactionKey, connection3);
                return ((Mono) function.apply(connection3)).onErrorResume(th -> {
                    return Mono.from(connection3.rollbackTransaction()).then(Mono.error(th));
                }).flatMap(obj -> {
                    return Mono.from(connection3.commitTransaction()).then(Mono.just(obj));
                }).switchIfEmpty(Mono.from(connection3.commitTransaction()).then(Mono.empty()));
            }, connection4 -> {
                return Mono.fromRunnable(() -> {
                    current.remove(this.transactionKey);
                });
            }) : withConnection(connection5 -> {
                return Mono.usingWhen(Mono.from(connection5.beginTransaction()).thenReturn(connection5), connection5 -> {
                    current.set(this.transactionKey, connection5);
                    return ((Mono) function.apply(connection5)).onErrorResume(th -> {
                        return Mono.from(connection5.rollbackTransaction()).then(Mono.error(th));
                    }).flatMap(obj -> {
                        return Mono.from(connection5.commitTransaction()).then(Mono.just(obj));
                    }).switchIfEmpty(Mono.from(connection5.commitTransaction()).then(Mono.empty()));
                }, connection6 -> {
                    return Mono.fromRunnable(() -> {
                        current.remove(this.transactionKey);
                    });
                });
            });
        });
    }

    @Override // ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory
    public <T> Mono<T> withConnection(Function<Connection, Mono<T>> function) {
        return Mono.deferContextual(contextView -> {
            Context current = Context.Reactor.current(contextView);
            Connection connection = (Connection) current.get(this.connectionKey);
            return connection != null ? (Mono) function.apply(connection) : Mono.usingWhen(this.connectionFactory.create(), connection2 -> {
                current.set(this.connectionKey, connection2);
                return (Mono) function.apply(connection2);
            }, connection3 -> {
                current.remove(this.connectionKey);
                return Mono.fromRunnable(() -> {
                    Mono.from(connection3.close()).subscribe();
                });
            });
        });
    }

    @Override // ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory
    public <T> Flux<T> withConnectionFlux(Function<Connection, Flux<T>> function) {
        return Flux.deferContextual(contextView -> {
            Context current = Context.Reactor.current(contextView);
            Connection connection = (Connection) current.get(this.connectionKey);
            return connection != null ? (Publisher) function.apply(connection) : Flux.usingWhen(this.connectionFactory.create(), connection2 -> {
                current.set(this.connectionKey, connection2);
                return (Publisher) function.apply(connection2);
            }, connection3 -> {
                current.remove(this.connectionKey);
                return Mono.fromRunnable(() -> {
                    Mono.from(connection3.close()).subscribe();
                });
            });
        });
    }

    private static ConnectionPool r2dbcConnectionFactory(R2dbcDatabaseConfig r2dbcDatabaseConfig, List<Function<ConnectionFactoryOptions.Builder, ConnectionFactoryOptions.Builder>> list) {
        ConnectionFactoryOptions.Builder option = ConnectionFactoryOptions.parse(r2dbcDatabaseConfig.url()).mutate().option(ConnectionFactoryOptions.USER, r2dbcDatabaseConfig.username()).option(ConnectionFactoryOptions.PASSWORD, r2dbcDatabaseConfig.password()).option(ConnectionFactoryOptions.CONNECT_TIMEOUT, Duration.ofMillis(r2dbcDatabaseConfig.connectionTimeout()));
        Iterator<Function<ConnectionFactoryOptions.Builder, ConnectionFactoryOptions.Builder>> it = list.iterator();
        while (it.hasNext()) {
            option = it.next().apply(option);
        }
        return new ConnectionPool(ConnectionPoolConfiguration.builder().name(r2dbcDatabaseConfig.poolName()).maxSize(r2dbcDatabaseConfig.maxPoolSize()).maxLifeTime(Duration.ofMillis(r2dbcDatabaseConfig.maxLifetime())).maxAcquireTime(Duration.ofMillis(r2dbcDatabaseConfig.connectionTimeout())).acquireRetry(r2dbcDatabaseConfig.acquireRetry()).maxIdleTime(Duration.ofMillis(r2dbcDatabaseConfig.idleTimeout())).validationQuery("SELECT 1").validationDepth(ValidationDepth.REMOTE).connectionFactory(ConnectionFactories.get(option.build())).build());
    }

    public Mono<Void> init() {
        return this.connectionFactory.warmup().then();
    }

    public Mono<Void> release() {
        return this.connectionFactory.disposeLater();
    }
}
