package akka.persistence.r2dbc.internal;

import akka.Done;
import akka.Done$;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.scaladsl.package$;
import akka.actor.typed.scaladsl.package$LoggerOps$;
import akka.annotation.InternalStableApi;
import akka.dispatch.ExecutionContexts$;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.Statement;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import scala.Function1;
import scala.Option;
import scala.collection.IterableOnceOps;
import scala.collection.JavaConverters$;
import scala.collection.immutable.IndexedSeq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.control.NonFatal$;

/* compiled from: R2dbcExecutor.scala */
@InternalStableApi
/* loaded from: input_file:akka/persistence/r2dbc/internal/R2dbcExecutor.class */
public class R2dbcExecutor {
    private final ConnectionFactory connectionFactory;
    private final Logger log;
    private final ExecutionContext ec;
    private final ActorSystem<?> system;
    private final long logDbCallsExceedingMicros;
    private final boolean logDbCallsExceedingEnabled;

    /* compiled from: R2dbcExecutor.scala */
    /* loaded from: input_file:akka/persistence/r2dbc/internal/R2dbcExecutor$PublisherOps.class */
    public static final class PublisherOps<T> {
        private final Publisher publisher;

        public PublisherOps(Publisher<T> publisher) {
            this.publisher = publisher;
        }

        public int hashCode() {
            return R2dbcExecutor$PublisherOps$.MODULE$.hashCode$extension(publisher());
        }

        public boolean equals(Object obj) {
            return R2dbcExecutor$PublisherOps$.MODULE$.equals$extension(publisher(), obj);
        }

        public Publisher<T> publisher() {
            return this.publisher;
        }

        public Future<T> asFuture() {
            return R2dbcExecutor$PublisherOps$.MODULE$.asFuture$extension(publisher());
        }

        public Future<Done> asFutureDone() {
            return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(publisher());
        }
    }

    public static <T> Publisher PublisherOps(Publisher<T> publisher) {
        return R2dbcExecutor$.MODULE$.PublisherOps(publisher);
    }

    public static <A> Future<IndexedSeq<A>> selectInTx(Statement statement, Function1<Row, A> function1, ExecutionContext executionContext, ActorSystem<?> actorSystem) {
        return R2dbcExecutor$.MODULE$.selectInTx(statement, function1, executionContext, actorSystem);
    }

    public static <A> Future<Option<A>> selectOneInTx(Statement statement, Function1<Row, A> function1, ExecutionContext executionContext, ActorSystem<?> actorSystem) {
        return R2dbcExecutor$.MODULE$.selectOneInTx(statement, function1, executionContext, actorSystem);
    }

    public static Future<Object> updateBatchInTx(Statement statement, ExecutionContext executionContext) {
        return R2dbcExecutor$.MODULE$.updateBatchInTx(statement, executionContext);
    }

    public static Future<IndexedSeq<Object>> updateInTx(IndexedSeq<Statement> indexedSeq, ExecutionContext executionContext) {
        return R2dbcExecutor$.MODULE$.updateInTx(indexedSeq, executionContext);
    }

    public static Future<Object> updateOneInTx(Statement statement, ExecutionContext executionContext) {
        return R2dbcExecutor$.MODULE$.updateOneInTx(statement, executionContext);
    }

    public R2dbcExecutor(ConnectionFactory connectionFactory, Logger logger, FiniteDuration finiteDuration, ExecutionContext executionContext, ActorSystem<?> actorSystem) {
        this.connectionFactory = connectionFactory;
        this.log = logger;
        this.ec = executionContext;
        this.system = actorSystem;
        this.logDbCallsExceedingMicros = finiteDuration.toMicros();
        this.logDbCallsExceedingEnabled = this.logDbCallsExceedingMicros >= 0;
    }

    public ConnectionFactory connectionFactory() {
        return this.connectionFactory;
    }

    private long nanoTime() {
        if (this.logDbCallsExceedingEnabled) {
            return System.nanoTime();
        }
        return 0L;
    }

    private long durationInMicros(long j) {
        if (this.logDbCallsExceedingEnabled) {
            return (nanoTime() - j) / 1000;
        }
        return Long.MIN_VALUE;
    }

    private Future<Connection> getConnection(String str) {
        long nanoTime = nanoTime();
        return R2dbcExecutor$PublisherOps$.MODULE$.asFuture$extension(R2dbcExecutor$.MODULE$.PublisherOps(connectionFactory().create())).map(connection -> {
            long durationInMicros = durationInMicros(nanoTime);
            if (durationInMicros >= this.logDbCallsExceedingMicros) {
                this.log.info("{} - getConnection took [{}] µs", str, BoxesRunTime.boxToLong(durationInMicros));
            }
            return connection;
        }, ExecutionContexts$.MODULE$.parasitic());
    }

    public Future<Done> executeDdl(String str, Function1<Connection, Statement> function1) {
        return withAutoCommitConnection(str, connection -> {
            return R2dbcExecutor$PublisherOps$.MODULE$.asFuture$extension(R2dbcExecutor$.MODULE$.PublisherOps(((Statement) function1.apply(connection)).execute())).flatMap(result -> {
                return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(result.getRowsUpdated()));
            }, this.ec);
        });
    }

    public Future<Done> executeDdls(String str, Function1<Connection, IndexedSeq<Statement>> function1) {
        return withConnection(str, connection -> {
            return (Future) ((IndexedSeq) function1.apply(connection)).foldLeft(Future$.MODULE$.successful(Done$.MODULE$), (future, statement) -> {
                return future.flatMap(done -> {
                    return R2dbcExecutor$PublisherOps$.MODULE$.asFuture$extension(R2dbcExecutor$.MODULE$.PublisherOps(statement.execute())).flatMap(result -> {
                        return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(result.getRowsUpdated()));
                    }, this.ec);
                }, this.ec);
            });
        });
    }

    public Future<Object> updateOne(String str, Function1<Connection, Statement> function1) {
        return withAutoCommitConnection(str, connection -> {
            return R2dbcExecutor$.MODULE$.updateOneInTx((Statement) function1.apply(connection), this.ec);
        });
    }

    public Future<Object> updateInBatch(String str, Function1<Connection, Statement> function1) {
        return withConnection(str, connection -> {
            return R2dbcExecutor$.MODULE$.updateBatchInTx((Statement) function1.apply(connection), this.ec);
        });
    }

    public Future<IndexedSeq<Object>> update(String str, Function1<Connection, IndexedSeq<Statement>> function1) {
        return withConnection(str, connection -> {
            return R2dbcExecutor$.MODULE$.updateInTx((IndexedSeq) function1.apply(connection), this.ec);
        });
    }

    public <A> Future<A> updateOneReturning(String str, Function1<Connection, Statement> function1, Function1<Row, A> function12) {
        return withAutoCommitConnection(str, connection -> {
            return R2dbcExecutor$PublisherOps$.MODULE$.asFuture$extension(R2dbcExecutor$.MODULE$.PublisherOps(((Statement) function1.apply(connection)).execute())).flatMap(result -> {
                return R2dbcExecutor$PublisherOps$.MODULE$.asFuture$extension(R2dbcExecutor$.MODULE$.PublisherOps(Mono.from(result.map((row, rowMetadata) -> {
                    return function12.apply(row);
                }))));
            }, this.ec);
        });
    }

    public <A> Future<IndexedSeq<A>> updateInBatchReturning(String str, Function1<Connection, Statement> function1, Function1<Row, A> function12) {
        return withConnection(str, connection -> {
            return R2dbcExecutor$PublisherOps$.MODULE$.asFuture$extension(R2dbcExecutor$.MODULE$.PublisherOps(Flux.from(((Statement) function1.apply(connection)).execute()).concatMap(result -> {
                return result.map((row, rowMetadata) -> {
                    return function12.apply(row);
                });
            }).collectList())).map(list -> {
                return ((IterableOnceOps) JavaConverters$.MODULE$.asScalaIteratorConverter(list.iterator()).asScala()).toVector();
            }, this.ec);
        });
    }

    public <A> Future<Option<A>> selectOne(String str, Function1<Connection, Statement> function1, Function1<Row, A> function12) {
        return select(str, function1, function12).map(indexedSeq -> {
            return indexedSeq.headOption();
        }, this.ec);
    }

    public <A> Future<IndexedSeq<A>> select(String str, Function1<Connection, Statement> function1, Function1<Row, A> function12) {
        return getConnection(str).flatMap(connection -> {
            Future failed;
            long nanoTime = nanoTime();
            try {
                failed = R2dbcExecutor$.MODULE$.selectInTx((Statement) function1.apply(connection), function12, this.ec, this.system);
            } catch (Throwable th) {
                if (th != null) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (!unapply.isEmpty()) {
                        failed = Future$.MODULE$.failed((Throwable) unapply.get());
                    }
                }
                throw th;
            }
            Future future = failed;
            future.failed().foreach(th2 -> {
                package$LoggerOps$.MODULE$.debug2$extension(package$.MODULE$.LoggerOps(this.log), "{} - Select failed: {}", str, th2);
                return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(connection.close()));
            }, this.ec);
            return future.flatMap(indexedSeq -> {
                return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(connection.close())).map(done -> {
                    long durationInMicros = durationInMicros(nanoTime);
                    if (durationInMicros >= this.logDbCallsExceedingMicros) {
                        package$LoggerOps$.MODULE$.infoN$extension(package$.MODULE$.LoggerOps(this.log), "{} - Selected [{}] rows in [{}] µs", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str, BoxesRunTime.boxToInteger(indexedSeq.size()), BoxesRunTime.boxToLong(durationInMicros)}));
                    }
                    return indexedSeq;
                }, this.ec);
            }, this.ec);
        }, this.ec);
    }

    public <A> Future<A> withConnection(String str, Function1<Connection, Future<A>> function1) {
        return getConnection(str).flatMap(connection -> {
            long nanoTime = nanoTime();
            return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(connection.beginTransaction())).flatMap(done -> {
                Future failed;
                try {
                    failed = (Future) function1.apply(connection);
                } catch (Throwable th) {
                    if (th != null) {
                        Option unapply = NonFatal$.MODULE$.unapply(th);
                        if (!unapply.isEmpty()) {
                            failed = Future$.MODULE$.failed((Throwable) unapply.get());
                        }
                    }
                    throw th;
                }
                Future future = failed;
                future.failed().foreach(th2 -> {
                    if (this.log.isDebugEnabled()) {
                        package$LoggerOps$.MODULE$.debug2$extension(package$.MODULE$.LoggerOps(this.log), "{} - DB call failed: {}", str, th2.toString());
                    }
                    return rollbackAndClose(connection);
                }, this.ec);
                return future.flatMap(obj -> {
                    return commitAndClose(connection).map(done -> {
                        long durationInMicros = durationInMicros(nanoTime);
                        if (durationInMicros >= this.logDbCallsExceedingMicros) {
                            this.log.info("{} - DB call completed in [{}] µs", str, BoxesRunTime.boxToLong(durationInMicros));
                        }
                        return obj;
                    }, this.ec);
                }, this.ec);
            }, this.ec);
        }, this.ec);
    }

    public <A> Future<A> withAutoCommitConnection(String str, Function1<Connection, Future<A>> function1) {
        return getConnection(str).flatMap(connection -> {
            long nanoTime = nanoTime();
            return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(connection.setAutoCommit(true))).flatMap(done -> {
                Future failed;
                try {
                    failed = (Future) function1.apply(connection);
                } catch (Throwable th) {
                    if (th != null) {
                        Option unapply = NonFatal$.MODULE$.unapply(th);
                        if (!unapply.isEmpty()) {
                            failed = Future$.MODULE$.failed((Throwable) unapply.get());
                        }
                    }
                    throw th;
                }
                Future future = failed;
                future.failed().foreach(th2 -> {
                    package$LoggerOps$.MODULE$.debug2$extension(package$.MODULE$.LoggerOps(this.log), "{} - DB call failed: {}", str, th2);
                    return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(connection.close()));
                }, this.ec);
                return future.flatMap(obj -> {
                    return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(connection.close())).map(done -> {
                        long durationInMicros = durationInMicros(nanoTime);
                        if (durationInMicros >= this.logDbCallsExceedingMicros) {
                            package$LoggerOps$.MODULE$.infoN$extension(package$.MODULE$.LoggerOps(this.log), "{} - DB call completed [{}] in [{}] µs", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{str, obj, BoxesRunTime.boxToLong(durationInMicros)}));
                        }
                        return obj;
                    }, this.ec);
                }, this.ec);
            }, this.ec);
        }, this.ec);
    }

    private Future<Done> commitAndClose(Connection connection) {
        return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(connection.commitTransaction())).andThen(new R2dbcExecutor$$anon$1(connection), this.ec);
    }

    private Future<Done> rollbackAndClose(Connection connection) {
        return R2dbcExecutor$PublisherOps$.MODULE$.asFutureDone$extension(R2dbcExecutor$.MODULE$.PublisherOps(connection.rollbackTransaction())).andThen(new R2dbcExecutor$$anon$2(connection), this.ec);
    }
}
