package akka.projection.jdbc.internal;

import akka.Done;
import akka.Done$;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.scaladsl.package$;
import akka.actor.typed.scaladsl.package$LoggerOps$;
import akka.annotation.InternalApi;
import akka.projection.ProjectionId;
import akka.projection.ProjectionId$;
import akka.projection.internal.ManagementState;
import akka.projection.internal.ManagementState$;
import akka.projection.internal.OffsetSerialization;
import akka.projection.internal.OffsetSerialization$MultipleOffsets$;
import akka.projection.internal.OffsetSerialization$SingleOffset$;
import akka.projection.jdbc.JdbcSession;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.time.Clock;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Some$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: JdbcOffsetStore.scala */
@InternalApi
/* loaded from: input_file:akka/projection/jdbc/internal/JdbcOffsetStore.class */
public class JdbcOffsetStore<S extends JdbcSession> {
    private final JdbcSettings settings;
    private final Function0<S> jdbcSessionFactory;
    private final Clock clock;
    private final Logger logger;
    private final boolean verboseLogging;
    private final OffsetSerialization offsetSerialization;
    private final ExecutionContext executionContext;

    public JdbcOffsetStore(ActorSystem<?> actorSystem, JdbcSettings jdbcSettings, Function0<S> function0, Clock clock) {
        this.settings = jdbcSettings;
        this.jdbcSessionFactory = function0;
        this.clock = clock;
        this.logger = LoggerFactory.getLogger(getClass());
        this.verboseLogging = this.logger.isDebugEnabled() && jdbcSettings.verboseLoggingEnabled();
        this.offsetSerialization = new OffsetSerialization(actorSystem);
        this.executionContext = jdbcSettings.executionContext();
    }

    public JdbcOffsetStore(ActorSystem<?> actorSystem, JdbcSettings jdbcSettings, Function0<S> function0) {
        this(actorSystem, jdbcSettings, function0, Clock.systemUTC());
    }

    public ExecutionContext executionContext() {
        return this.executionContext;
    }

    public Future<Done> dropIfExists() {
        return JdbcSessionUtil$.MODULE$.withConnection(this.jdbcSessionFactory, connection -> {
            this.logger.debug("creating offset-store table, using connection id [{}]", BoxesRunTime.boxToInteger(System.identityHashCode(connection)));
            return (Done$) JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                return dropIfExists$$anonfun$1$$anonfun$1(r1);
            }, statement -> {
                statement.execute(this.settings.dialect().dropTableStatement());
                statement.execute(this.settings.dialect().dropManagementTableStatement());
                return Done$.MODULE$;
            });
        }, executionContext());
    }

    public Future<Done> createIfNotExists() {
        return JdbcSessionUtil$.MODULE$.withConnection(this.jdbcSessionFactory, connection -> {
            this.logger.debug("creating offset-store table, using connection id [{}]", BoxesRunTime.boxToInteger(System.identityHashCode(connection)));
            return (Done$) JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                return createIfNotExists$$anonfun$1$$anonfun$1(r1);
            }, statement -> {
                this.settings.dialect().createTableStatements().foreach(str -> {
                    return statement.execute(str);
                });
                this.settings.dialect().createManagementTableStatements().foreach(str2 -> {
                    return statement.execute(str2);
                });
                return Done$.MODULE$;
            });
        }, executionContext());
    }

    public Future<Done> clearOffset(ProjectionId projectionId) {
        return JdbcSessionUtil$.MODULE$.withConnection(this.jdbcSessionFactory, connection -> {
            this.logger.debug("clearing offset for [{}], using connection id [{}], using connection id [{}]", projectionId, BoxesRunTime.boxToInteger(System.identityHashCode(connection)));
            return (Done$) JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                return r1.clearOffset$$anonfun$1$$anonfun$1(r2);
            }, preparedStatement -> {
                preparedStatement.setString(1, projectionId.name());
                preparedStatement.setString(2, projectionId.key());
                this.logger.debug("clearing offset for [{}] - executed statement returned [{}]", projectionId, BoxesRunTime.boxToInteger(preparedStatement.executeUpdate()));
                return Done$.MODULE$;
            });
        }, executionContext());
    }

    public <Offset> Future<Option<Offset>> readOffset(ProjectionId projectionId) {
        return JdbcSessionUtil$.MODULE$.withConnection(this.jdbcSessionFactory, connection -> {
            if (this.verboseLogging) {
                this.logger.debug("reading offset for [{}], using connection id [{}]", projectionId, BoxesRunTime.boxToInteger(System.identityHashCode(connection)));
            }
            return (Option) JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                return r1.readOffset$$anonfun$1$$anonfun$1(r2);
            }, preparedStatement -> {
                preparedStatement.setString(1, projectionId.name());
                return (Option) JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                    return readOffset$$anonfun$1$$anonfun$2$$anonfun$1(r1);
                }, resultSet -> {
                    ListBuffer empty = ListBuffer$.MODULE$.empty();
                    while (resultSet.next()) {
                        empty.append(OffsetSerialization$SingleOffset$.MODULE$.apply(ProjectionId$.MODULE$.apply(projectionId.name(), resultSet.getString("PROJECTION_KEY")), resultSet.getString("MANIFEST"), resultSet.getString("CURRENT_OFFSET"), resultSet.getBoolean("MERGEABLE")));
                    }
                    None$ apply = empty.isEmpty() ? None$.MODULE$ : empty.forall(singleOffset -> {
                        return singleOffset.mergeable();
                    }) ? Some$.MODULE$.apply(this.offsetSerialization.fromStorageRepresentation(OffsetSerialization$MultipleOffsets$.MODULE$.apply(empty.toList()))) : empty.find(singleOffset2 -> {
                        ProjectionId id = singleOffset2.id();
                        return id != null ? id.equals(projectionId) : projectionId == null;
                    }).map(storageRepresentation -> {
                        return this.offsetSerialization.fromStorageRepresentation(storageRepresentation);
                    });
                    if (this.verboseLogging) {
                        package$LoggerOps$.MODULE$.debug2$extension(package$.MODULE$.LoggerOps(this.logger), "found offset [{}] for [{}]", apply, projectionId);
                    }
                    return apply;
                });
            });
        }, executionContext());
    }

    public <Offset> Future<Done> saveOffset(ProjectionId projectionId, Offset offset) {
        return JdbcSessionUtil$.MODULE$.withConnection(this.jdbcSessionFactory, connection -> {
            return saveOffsetBlocking(connection, projectionId, offset);
        }, executionContext());
    }

    public <Offset> Done saveOffsetBlocking(Connection connection, ProjectionId projectionId, Offset offset) {
        if (this.verboseLogging) {
            this.logger.debug("saving offset [{}], using connection id [{}]", offset, BoxesRunTime.boxToInteger(System.identityHashCode(connection)));
        }
        long epochMilli = Instant.now(this.clock).toEpochMilli();
        OffsetSerialization.SingleOffset storageRepresentation = this.offsetSerialization.toStorageRepresentation(projectionId, offset, this.offsetSerialization.toStorageRepresentation$default$3());
        if (storageRepresentation instanceof OffsetSerialization.SingleOffset) {
            insertOrUpdate$1(connection, offset, epochMilli, storageRepresentation);
        } else {
            if (!(storageRepresentation instanceof OffsetSerialization.MultipleOffsets)) {
                throw new MatchError(storageRepresentation);
            }
            OffsetSerialization$MultipleOffsets$.MODULE$.unapply((OffsetSerialization.MultipleOffsets) storageRepresentation)._1().foreach(singleOffset -> {
                insertOrUpdate$1(connection, offset, epochMilli, singleOffset);
            });
        }
        return Done$.MODULE$;
    }

    public Future<Option<ManagementState>> readManagementState(ProjectionId projectionId) {
        return JdbcSessionUtil$.MODULE$.withConnection(this.jdbcSessionFactory, connection -> {
            if (this.verboseLogging) {
                this.logger.debug("reading ManagementState for [{}], using connection id [{}]", projectionId, BoxesRunTime.boxToInteger(System.identityHashCode(connection)));
            }
            return (Option) JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                return r1.readManagementState$$anonfun$1$$anonfun$1(r2);
            }, preparedStatement -> {
                preparedStatement.setString(1, projectionId.name());
                preparedStatement.setString(2, projectionId.key());
                return (Option) JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                    return readManagementState$$anonfun$1$$anonfun$2$$anonfun$1(r1);
                }, resultSet -> {
                    Some some;
                    if (resultSet.next()) {
                        some = Some$.MODULE$.apply(ManagementState$.MODULE$.apply(resultSet.getBoolean("PAUSED")));
                    } else {
                        some = None$.MODULE$;
                    }
                    Some some2 = some;
                    if (this.verboseLogging) {
                        package$LoggerOps$.MODULE$.debug2$extension(package$.MODULE$.LoggerOps(this.logger), "found ManagementState [{}] for [{}]", some2, projectionId);
                    }
                    return some2;
                });
            });
        }, executionContext());
    }

    public Future<Done> savePaused(ProjectionId projectionId, boolean z) {
        return JdbcSessionUtil$.MODULE$.withConnection(this.jdbcSessionFactory, connection -> {
            if (this.verboseLogging) {
                package$LoggerOps$.MODULE$.debugN$extension(package$.MODULE$.LoggerOps(this.logger), "saving paused [{}] for [{}], using connection id [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToBoolean(z), projectionId, BoxesRunTime.boxToInteger(System.identityHashCode(connection))}));
            }
            insertOrUpdate$2(projectionId, z, connection, Instant.now(this.clock).toEpochMilli());
            return Done$.MODULE$;
        }, executionContext());
    }

    private static final Statement dropIfExists$$anonfun$1$$anonfun$1(Connection connection) {
        return connection.createStatement();
    }

    private static final Statement createIfNotExists$$anonfun$1$$anonfun$1(Connection connection) {
        return connection.createStatement();
    }

    private final PreparedStatement clearOffset$$anonfun$1$$anonfun$1(Connection connection) {
        return connection.prepareStatement(this.settings.dialect().clearOffsetStatement());
    }

    private final PreparedStatement readOffset$$anonfun$1$$anonfun$1(Connection connection) {
        return connection.prepareStatement(this.settings.dialect().readOffsetQuery());
    }

    private static final ResultSet readOffset$$anonfun$1$$anonfun$2$$anonfun$1(PreparedStatement preparedStatement) {
        return preparedStatement.executeQuery();
    }

    private static final boolean failedStatement$1(int i) {
        return i == 0 || i == -3;
    }

    private final PreparedStatement $anonfun$4(Connection connection) {
        return connection.prepareStatement(this.settings.dialect().updateStatement());
    }

    private final PreparedStatement insertOrUpdate$1$$anonfun$1(Connection connection) {
        return connection.prepareStatement(this.settings.dialect().insertStatement());
    }

    private final void insertOrUpdate$1(Connection connection, Object obj, long j, OffsetSerialization.SingleOffset singleOffset) {
        int unboxToInt = BoxesRunTime.unboxToInt(JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
            return r1.$anonfun$4(r2);
        }, preparedStatement -> {
            preparedStatement.setString(DialectDefaults$UpdateIndices$.MODULE$.OFFSET(), singleOffset.offsetStr());
            preparedStatement.setString(DialectDefaults$UpdateIndices$.MODULE$.MANIFEST(), singleOffset.manifest());
            preparedStatement.setBoolean(DialectDefaults$UpdateIndices$.MODULE$.MERGEABLE(), singleOffset.mergeable());
            preparedStatement.setLong(DialectDefaults$UpdateIndices$.MODULE$.LAST_UPDATED(), j);
            preparedStatement.setString(DialectDefaults$UpdateIndices$.MODULE$.PROJECTION_NAME(), singleOffset.id().name());
            preparedStatement.setString(DialectDefaults$UpdateIndices$.MODULE$.PROJECTION_KEY(), singleOffset.id().key());
            return preparedStatement.executeUpdate();
        }));
        if (this.verboseLogging) {
            package$LoggerOps$.MODULE$.debug2$extension(package$.MODULE$.LoggerOps(this.logger), "tried to update offset [{}], statement result [{}]", obj, BoxesRunTime.boxToInteger(unboxToInt));
        }
        if (failedStatement$1(unboxToInt)) {
            JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                return r1.insertOrUpdate$1$$anonfun$1(r2);
            }, preparedStatement2 -> {
                preparedStatement2.setString(DialectDefaults$InsertIndices$.MODULE$.PROJECTION_NAME(), singleOffset.id().name());
                preparedStatement2.setString(DialectDefaults$InsertIndices$.MODULE$.PROJECTION_KEY(), singleOffset.id().key());
                preparedStatement2.setString(DialectDefaults$InsertIndices$.MODULE$.OFFSET(), singleOffset.offsetStr());
                preparedStatement2.setString(DialectDefaults$InsertIndices$.MODULE$.MANIFEST(), singleOffset.manifest());
                preparedStatement2.setBoolean(DialectDefaults$InsertIndices$.MODULE$.MERGEABLE(), singleOffset.mergeable());
                preparedStatement2.setLong(DialectDefaults$InsertIndices$.MODULE$.LAST_UPDATED(), j);
                int executeUpdate = preparedStatement2.executeUpdate();
                if (this.verboseLogging) {
                    package$LoggerOps$.MODULE$.debug2$extension(package$.MODULE$.LoggerOps(this.logger), "tried to insert offset [{}], batch result [{}]", obj, BoxesRunTime.boxToInteger(executeUpdate));
                }
                if (failedStatement$1(executeUpdate)) {
                    throw new RuntimeException(new StringBuilder(26).append("Failed to insert offset [").append(singleOffset).append("]").toString());
                }
            });
        }
    }

    private final PreparedStatement readManagementState$$anonfun$1$$anonfun$1(Connection connection) {
        return connection.prepareStatement(this.settings.dialect().readManagementStateQuery());
    }

    private static final ResultSet readManagementState$$anonfun$1$$anonfun$2$$anonfun$1(PreparedStatement preparedStatement) {
        return preparedStatement.executeQuery();
    }

    private static final boolean failedStatement$2(int i) {
        return i == 0 || i == -3;
    }

    private final PreparedStatement $anonfun$6(Connection connection) {
        return connection.prepareStatement(this.settings.dialect().updateManagementStatement());
    }

    private final PreparedStatement insertOrUpdate$2$$anonfun$1(Connection connection) {
        return connection.prepareStatement(this.settings.dialect().insertManagementStatement());
    }

    private final void insertOrUpdate$2(ProjectionId projectionId, boolean z, Connection connection, long j) {
        int unboxToInt = BoxesRunTime.unboxToInt(JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
            return r1.$anonfun$6(r2);
        }, preparedStatement -> {
            preparedStatement.setBoolean(DialectDefaults$UpdateManagementIndices$.MODULE$.PAUSED(), z);
            preparedStatement.setLong(DialectDefaults$UpdateManagementIndices$.MODULE$.LAST_UPDATED(), j);
            preparedStatement.setString(DialectDefaults$UpdateManagementIndices$.MODULE$.PROJECTION_NAME(), projectionId.name());
            preparedStatement.setString(DialectDefaults$UpdateManagementIndices$.MODULE$.PROJECTION_KEY(), projectionId.key());
            return preparedStatement.executeUpdate();
        }));
        if (this.verboseLogging) {
            package$LoggerOps$.MODULE$.debugN$extension(package$.MODULE$.LoggerOps(this.logger), "tried to update paused [{}] for [{}], statement result [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToBoolean(z), projectionId, BoxesRunTime.boxToInteger(unboxToInt)}));
        }
        if (failedStatement$2(unboxToInt)) {
            JdbcSessionUtil$.MODULE$.tryWithResource(() -> {
                return r1.insertOrUpdate$2$$anonfun$1(r2);
            }, preparedStatement2 -> {
                preparedStatement2.setString(DialectDefaults$InsertManagementIndices$.MODULE$.PROJECTION_NAME(), projectionId.name());
                preparedStatement2.setString(DialectDefaults$InsertManagementIndices$.MODULE$.PROJECTION_KEY(), projectionId.key());
                preparedStatement2.setBoolean(DialectDefaults$InsertManagementIndices$.MODULE$.PAUSED(), z);
                preparedStatement2.setLong(DialectDefaults$InsertManagementIndices$.MODULE$.LAST_UPDATED(), j);
                int executeUpdate = preparedStatement2.executeUpdate();
                if (this.verboseLogging) {
                    package$LoggerOps$.MODULE$.debugN$extension(package$.MODULE$.LoggerOps(this.logger), "tried to insert paused [{}] for [{}], batch result [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToBoolean(z), projectionId, BoxesRunTime.boxToInteger(executeUpdate)}));
                }
                if (failedStatement$2(executeUpdate)) {
                    throw new RuntimeException(new StringBuilder(33).append("Failed to insert paused [").append(z).append("] for [").append(projectionId).append("]").toString());
                }
            });
        }
    }
}
