package akka.persistence.r2dbc.internal.postgres;

import akka.actor.typed.ActorSystem;
import akka.actor.typed.scaladsl.package$LoggerOps$;
import akka.annotation.InternalApi;
import akka.dispatch.ExecutionContexts$;
import akka.persistence.Persistence;
import akka.persistence.Persistence$;
import akka.persistence.r2dbc.R2dbcSettings;
import akka.persistence.r2dbc.internal.JournalDao;
import akka.persistence.r2dbc.internal.PayloadCodec;
import akka.persistence.r2dbc.internal.PayloadCodec$;
import akka.persistence.r2dbc.internal.R2dbcExecutor;
import akka.persistence.r2dbc.internal.SerializedEventMetadata;
import akka.persistence.r2dbc.internal.Sql$;
import akka.persistence.r2dbc.internal.Sql$Interpolation$;
import akka.persistence.typed.PersistenceId$;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.Statement;
import java.time.Instant;
import org.slf4j.Logger;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: PostgresJournalDao.scala */
@InternalApi
/* loaded from: input_file:akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.class */
public class PostgresJournalDao implements JournalDao {
    private final R2dbcSettings journalSettings;
    private final ExecutionContext ec;
    private final Persistence persistenceExt;
    private final R2dbcExecutor r2dbcExecutor;
    private final String journalTable;
    private final PayloadCodec journalPayloadCodec;
    private final String insertEventWithParameterTimestampSql;
    private final String insertEventWithTransactionTimestampSql;
    private final String selectHighestSequenceNrSql;
    private final String selectLowestSequenceNrSql;
    private final String deleteEventsSql;
    private final String insertDeleteMarkerSql;

    public static Option<SerializedEventMetadata> readMetadata(Row row) {
        return PostgresJournalDao$.MODULE$.readMetadata(row);
    }

    public PostgresJournalDao(R2dbcSettings r2dbcSettings, ConnectionFactory connectionFactory, ExecutionContext executionContext, ActorSystem<?> actorSystem) {
        this.journalSettings = r2dbcSettings;
        this.ec = executionContext;
        this.persistenceExt = Persistence$.MODULE$.apply(actorSystem);
        this.r2dbcExecutor = new R2dbcExecutor(connectionFactory, log(), r2dbcSettings.logDbCallsExceeding(), r2dbcSettings.connectionFactorySettings().poolSettings().closeCallsExceeding(), executionContext, actorSystem);
        this.journalTable = r2dbcSettings.journalTableWithSchema();
        this.journalPayloadCodec = r2dbcSettings.journalPayloadCodec();
        String sb = new StringBuilder(246).append("INSERT INTO ").append(journalTable()).append(" ").append("(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) ").append("VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ").toString();
        Tuple2 apply = Tuple2$.MODULE$.apply(r2dbcSettings.dbTimestampMonotonicIncreasing() ? Sql$Interpolation$.MODULE$.sql$extension(Sql$.MODULE$.Interpolation(StringContext$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"", " ?) RETURNING db_timestamp"}))), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{sb})) : Sql$Interpolation$.MODULE$.sql$extension(Sql$.MODULE$.Interpolation(StringContext$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"", " GREATEST(?, ", ")) RETURNING db_timestamp"}))), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{sb, timestampSubSelect$1()})), r2dbcSettings.dbTimestampMonotonicIncreasing() ? Sql$Interpolation$.MODULE$.sql$extension(Sql$.MODULE$.Interpolation(StringContext$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"", " CURRENT_TIMESTAMP) RETURNING db_timestamp"}))), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{sb})) : Sql$Interpolation$.MODULE$.sql$extension(Sql$.MODULE$.Interpolation(StringContext$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"", " GREATEST(CURRENT_TIMESTAMP, ", ")) RETURNING db_timestamp"}))), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{sb, timestampSubSelect$1()})));
        this.insertEventWithParameterTimestampSql = (String) apply._1();
        this.insertEventWithTransactionTimestampSql = (String) apply._2();
        this.selectHighestSequenceNrSql = Sql$Interpolation$.MODULE$.sql$extension(Sql$.MODULE$.Interpolation(StringContext$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"\n    SELECT MAX(seq_nr) from ", "\n    WHERE persistence_id = ? AND seq_nr >= ?"}))), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{journalTable()}));
        this.selectLowestSequenceNrSql = Sql$Interpolation$.MODULE$.sql$extension(Sql$.MODULE$.Interpolation(StringContext$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"\n    SELECT MIN(seq_nr) from ", "\n    WHERE persistence_id = ?"}))), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{journalTable()}));
        this.deleteEventsSql = Sql$Interpolation$.MODULE$.sql$extension(Sql$.MODULE$.Interpolation(StringContext$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"\n    DELETE FROM ", "\n    WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?"}))), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{journalTable()}));
        this.insertDeleteMarkerSql = Sql$Interpolation$.MODULE$.sql$extension(Sql$.MODULE$.Interpolation(StringContext$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"\n    INSERT INTO ", "\n    (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)\n    VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?, ?, ?, ?)"}))), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{journalTable()}));
    }

    public Logger log() {
        return PostgresJournalDao$.akka$persistence$r2dbc$internal$postgres$PostgresJournalDao$$$log;
    }

    public R2dbcExecutor r2dbcExecutor() {
        return this.r2dbcExecutor;
    }

    public String journalTable() {
        return this.journalTable;
    }

    public PayloadCodec journalPayloadCodec() {
        return this.journalPayloadCodec;
    }

    @Override // akka.persistence.r2dbc.internal.JournalDao
    public Future<Instant> writeEvents(Seq<JournalDao.SerializedJournalRow> seq) {
        Predef$.MODULE$.require(seq.nonEmpty());
        String persistenceId = ((JournalDao.SerializedJournalRow) seq.head()).persistenceId();
        long seqNr = ((JournalDao.SerializedJournalRow) seq.head()).seqNr() - 1;
        Instant dbTimestamp = ((JournalDao.SerializedJournalRow) seq.head()).dbTimestamp();
        Instant instant = Instant.EPOCH;
        boolean z = dbTimestamp != null ? dbTimestamp.equals(instant) : instant == null;
        String str = z ? this.insertEventWithTransactionTimestampSql : this.insertEventWithParameterTimestampSql;
        int size = seq.size();
        if (size == 1) {
            Future<Instant> updateOneReturning = r2dbcExecutor().updateOneReturning(new StringBuilder(9).append("insert [").append(persistenceId).append("]").toString(), connection -> {
                return bind$1(seqNr, z, connection.createStatement(str), (JournalDao.SerializedJournalRow) seq.head());
            }, row -> {
                return (Instant) row.get(0, Instant.class);
            });
            if (log().isDebugEnabled()) {
                updateOneReturning.foreach(instant2 -> {
                    log().debug("Wrote [{}] events for persistenceId [{}]", BoxesRunTime.boxToInteger(1), ((JournalDao.SerializedJournalRow) seq.head()).persistenceId());
                }, this.ec);
            }
            return updateOneReturning;
        }
        Future updateInBatchReturning = r2dbcExecutor().updateInBatchReturning(new StringBuilder(26).append("batch insert [").append(persistenceId).append("], [").append(size).append("] events").toString(), connection2 -> {
            return (Statement) seq.foldLeft(connection2.createStatement(str), (statement, serializedJournalRow) -> {
                statement.add();
                return bind$1(seqNr, z, statement, serializedJournalRow);
            });
        }, row2 -> {
            return (Instant) row2.get(0, Instant.class);
        });
        if (log().isDebugEnabled()) {
            updateInBatchReturning.foreach(indexedSeq -> {
                log().debug("Wrote [{}] events for persistenceId [{}]", BoxesRunTime.boxToInteger(size), ((JournalDao.SerializedJournalRow) seq.head()).persistenceId());
            }, this.ec);
        }
        return updateInBatchReturning.map(indexedSeq2 -> {
            return (Instant) indexedSeq2.head();
        }, ExecutionContexts$.MODULE$.parasitic());
    }

    @Override // akka.persistence.r2dbc.internal.JournalDao
    public Future<Object> readHighestSequenceNr(String str, long j) {
        Future<Object> map = r2dbcExecutor().select(new StringBuilder(23).append("select highest seqNr [").append(str).append("]").toString(), connection -> {
            return connection.createStatement(this.selectHighestSequenceNrSql).bind(0, str).bind(1, BoxesRunTime.boxToLong(j));
        }, row -> {
            Long l = (Long) row.get(0, Long.class);
            if (l == null) {
                return 0L;
            }
            return l.longValue();
        }).map(indexedSeq -> {
            if (indexedSeq.isEmpty()) {
                return 0L;
            }
            return BoxesRunTime.unboxToLong(indexedSeq.head());
        }, ExecutionContexts$.MODULE$.parasitic());
        if (log().isDebugEnabled()) {
            map.foreach(j2 -> {
                log().debug("Highest sequence nr for persistenceId [{}]: [{}]", str, BoxesRunTime.boxToLong(j2));
            }, this.ec);
        }
        return map;
    }

    @Override // akka.persistence.r2dbc.internal.JournalDao
    public Future<Object> readLowestSequenceNr(String str) {
        Future<Object> map = r2dbcExecutor().select(new StringBuilder(22).append("select lowest seqNr [").append(str).append("]").toString(), connection -> {
            return connection.createStatement(this.selectLowestSequenceNrSql).bind(0, str);
        }, row -> {
            Long l = (Long) row.get(0, Long.class);
            if (l == null) {
                return 0L;
            }
            return l.longValue();
        }).map(indexedSeq -> {
            if (indexedSeq.isEmpty()) {
                return 0L;
            }
            return BoxesRunTime.unboxToLong(indexedSeq.head());
        }, ExecutionContexts$.MODULE$.parasitic());
        if (log().isDebugEnabled()) {
            map.foreach(j -> {
                log().debug("Lowest sequence nr for persistenceId [{}]: [{}]", str, BoxesRunTime.boxToLong(j));
            }, this.ec);
        }
        return map;
    }

    private Future<Object> highestSeqNrForDelete(String str, long j) {
        return j == Long.MAX_VALUE ? readHighestSequenceNr(str, 0L) : Future$.MODULE$.successful(BoxesRunTime.boxToLong(j));
    }

    private Future<Object> lowestSequenceNrForDelete(String str, long j, int i) {
        return j <= ((long) i) ? Future$.MODULE$.successful(BoxesRunTime.boxToLong(1L)) : readLowestSequenceNr(str);
    }

    @Override // akka.persistence.r2dbc.internal.JournalDao
    public Future<BoxedUnit> deleteEventsTo(String str, long j, boolean z) {
        int eventsJournalDeleteBatchSize = this.journalSettings.cleanupSettings().eventsJournalDeleteBatchSize();
        return highestSeqNrForDelete(str, j).flatMap(obj -> {
            return deleteEventsTo$$anonfun$1(str, z, eventsJournalDeleteBatchSize, BoxesRunTime.unboxToLong(obj));
        }, this.ec);
    }

    private final String timestampSubSelect$1() {
        return new StringBuilder(95).append("(SELECT db_timestamp + '1 microsecond'::interval FROM ").append(journalTable()).append(" ").append("WHERE persistence_id = ? AND seq_nr = ?)").toString();
    }

    private final Statement bind$1(long j, boolean z, Statement statement, JournalDao.SerializedJournalRow serializedJournalRow) {
        PayloadCodec$.MODULE$.RichStatement(statement.bind(0, BoxesRunTime.boxToInteger(serializedJournalRow.slice())).bind(1, serializedJournalRow.entityType()).bind(2, serializedJournalRow.persistenceId()).bind(3, BoxesRunTime.boxToLong(serializedJournalRow.seqNr())).bind(4, serializedJournalRow.writerUuid()).bind(5, "").bind(6, BoxesRunTime.boxToInteger(serializedJournalRow.serId())).bind(7, serializedJournalRow.serManifest()), journalPayloadCodec()).bindPayload(8, (byte[]) serializedJournalRow.payload().get());
        if (serializedJournalRow.tags().isEmpty()) {
            statement.bindNull(9, String[].class);
        } else {
            statement.bind(9, serializedJournalRow.tags().toArray(ClassTag$.MODULE$.apply(String.class)));
        }
        Some metadata = serializedJournalRow.metadata();
        if (metadata instanceof Some) {
            SerializedEventMetadata serializedEventMetadata = (SerializedEventMetadata) metadata.value();
            statement.bind(10, BoxesRunTime.boxToInteger(serializedEventMetadata.serId())).bind(11, serializedEventMetadata.serManifest()).bind(12, serializedEventMetadata.payload());
        } else {
            if (!None$.MODULE$.equals(metadata)) {
                throw new MatchError(metadata);
            }
            statement.bindNull(10, Integer.class).bindNull(11, String.class).bindNull(12, byte[].class);
        }
        if (z) {
            if (this.journalSettings.dbTimestampMonotonicIncreasing()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                statement.bind(13, serializedJournalRow.persistenceId()).bind(14, BoxesRunTime.boxToLong(j));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        } else if (this.journalSettings.dbTimestampMonotonicIncreasing()) {
            statement.bind(13, serializedJournalRow.dbTimestamp());
        } else {
            statement.bind(13, serializedJournalRow.dbTimestamp()).bind(14, serializedJournalRow.persistenceId()).bind(15, BoxesRunTime.boxToLong(j));
        }
        return statement;
    }

    private final Statement insertDeleteMarkerStmt$1(String str, long j, Connection connection) {
        return PayloadCodec$.MODULE$.RichStatement(connection.createStatement(this.insertDeleteMarkerSql).bind(0, BoxesRunTime.boxToInteger(this.persistenceExt.sliceForPersistenceId(str))).bind(1, PersistenceId$.MODULE$.extractEntityType(str)).bind(2, str).bind(3, BoxesRunTime.boxToLong(j)).bind(4, "").bind(5, "").bind(6, BoxesRunTime.boxToInteger(0)).bind(7, ""), journalPayloadCodec()).bindPayloadOption(8, None$.MODULE$).bind(9, BoxesRunTime.boxToBoolean(true));
    }

    private final Future deleteBatch$1(String str, boolean z, long j, long j2, boolean z2) {
        return ((!z2 || z) ? r2dbcExecutor().updateOne(new StringBuilder(9).append("delete [").append(str).append("]").toString(), connection -> {
            return connection.createStatement(this.deleteEventsSql).bind(0, str).bind(1, BoxesRunTime.boxToLong(j)).bind(2, BoxesRunTime.boxToLong(j2));
        }) : r2dbcExecutor().update(new StringBuilder(27).append("delete [").append(str).append("] and insert marker").toString(), connection2 -> {
            return (IndexedSeq) package$.MODULE$.Vector().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Statement[]{connection2.createStatement(this.deleteEventsSql).bind(0, str).bind(1, BoxesRunTime.boxToLong(j)).bind(2, BoxesRunTime.boxToLong(j2)), insertDeleteMarkerStmt$1(str, j2, connection2)}));
        }).map(indexedSeq -> {
            return BoxesRunTime.unboxToLong(indexedSeq.head());
        }, this.ec)).map(j3 -> {
            if (log().isDebugEnabled()) {
                package$LoggerOps$.MODULE$.debugN$extension(akka.actor.typed.scaladsl.package$.MODULE$.LoggerOps(log()), "Deleted [{}] events for persistenceId [{}], from seq num [{}] to [{}]", ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(j3), str, BoxesRunTime.boxToLong(j), BoxesRunTime.boxToLong(j2)}));
            }
        }, ExecutionContexts$.MODULE$.parasitic());
    }

    private final Future deleteInBatches$1(String str, boolean z, int i, long j, long j2) {
        if (j + i > j2) {
            return deleteBatch$1(str, z, j, j2, true);
        }
        long j3 = (j + i) - 1;
        return deleteBatch$1(str, z, j, j3, false).flatMap(boxedUnit -> {
            return deleteInBatches$1(str, z, i, j3 + 1, j2);
        }, this.ec);
    }

    private final /* synthetic */ Future deleteEventsTo$$anonfun$1$$anonfun$1(String str, boolean z, int i, long j, long j2) {
        return deleteInBatches$1(str, z, i, j2, j).map(boxedUnit -> {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }, this.ec);
    }

    private final /* synthetic */ Future deleteEventsTo$$anonfun$1(String str, boolean z, int i, long j) {
        return lowestSequenceNrForDelete(str, j, i).flatMap(obj -> {
            return deleteEventsTo$$anonfun$1$$anonfun$1(str, z, i, j, BoxesRunTime.unboxToLong(obj));
        }, this.ec);
    }
}
