package ru.tinkoff.kora.database.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.database.common.QueryContext;
import ru.tinkoff.kora.database.common.telemetry.DataBaseTelemetry;

/* loaded from: input_file:ru/tinkoff/kora/database/r2dbc/R2dbcConnectionFactory.class */
public interface R2dbcConnectionFactory {
    Mono<Connection> currentConnection();

    Mono<Connection> newConnection();

    DataBaseTelemetry telemetry();

    <T> Mono<T> inTx(Function<Connection, Mono<T>> function);

    <T> Mono<T> withConnection(Function<Connection, Mono<T>> function);

    <T> Flux<T> withConnectionFlux(Function<Connection, Flux<T>> function);

    default <T> Mono<T> query(QueryContext queryContext, Consumer<Statement> consumer, Function<Flux<Result>, Mono<T>> function) {
        return Mono.deferContextual(contextView -> {
            DataBaseTelemetry.DataBaseTelemetryContext createContext = telemetry().createContext(Context.Reactor.current(contextView), queryContext);
            return withConnection(connection -> {
                Statement createStatement = connection.createStatement(queryContext.sql());
                consumer.accept(createStatement);
                return (Mono) function.apply(Flux.from(createStatement.execute()));
            }).doOnEach(signal -> {
                if (signal.isOnComplete()) {
                    createContext.close((Throwable) null);
                } else if (signal.isOnError()) {
                    createContext.close(signal.getThrowable());
                }
            });
        });
    }
}
