package org.apache.pekko.projection.cassandra.internal;

import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import java.time.Clock;
import java.time.Instant;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.projection.MergeableOffset;
import org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.internal.ManagementState;
import org.apache.pekko.projection.internal.ManagementState$;
import org.apache.pekko.projection.internal.OffsetSerialization;
import org.apache.pekko.projection.internal.OffsetSerialization$SingleOffset$;
import org.apache.pekko.stream.connectors.cassandra.scaladsl.CassandraSession;
import org.apache.pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry$;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.StringOps$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

/* compiled from: CassandraOffsetStore.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/projection/cassandra/internal/CassandraOffsetStore.class */
public class CassandraOffsetStore {
    private final Clock clock;
    private final OffsetSerialization offsetSerialization;
    private final ExecutionContext executionContext;
    private final CassandraSettings cassandraSettings;
    private final CassandraSession session;
    private final String keyspace;
    private final String table;
    private final String managementTable;
    private final int cassandraPartitions;

    public CassandraOffsetStore(ActorSystem<?> actorSystem, Clock clock) {
        this.clock = clock;
        this.offsetSerialization = new OffsetSerialization(actorSystem);
        this.executionContext = actorSystem.executionContext();
        this.cassandraSettings = CassandraSettings$.MODULE$.apply(actorSystem);
        this.session = CassandraSessionRegistry$.MODULE$.apply(actorSystem).sessionFor(this.cassandraSettings.sessionConfigPath());
        this.keyspace = this.cassandraSettings.keyspace();
        this.table = this.cassandraSettings.table();
        this.managementTable = this.cassandraSettings.managementTable();
        this.cassandraPartitions = 5;
    }

    public String keyspace() {
        return this.keyspace;
    }

    public String table() {
        return this.table;
    }

    public String managementTable() {
        return this.managementTable;
    }

    public CassandraOffsetStore(ActorSystem<?> actorSystem) {
        this(actorSystem, Clock.systemUTC());
    }

    private <T extends Statement<T>> Future<Option<Row>> selectOne(Statement<T> statement) {
        return this.session.selectOne(statement.setExecutionProfileName(this.cassandraSettings.profile()));
    }

    private <T extends Statement<T>> Future<Done> execute(Statement<T> statement) {
        return this.session.executeWrite(statement.setExecutionProfileName(this.cassandraSettings.profile()));
    }

    public <Offset> Future<Option<Offset>> readOffset(ProjectionId projectionId) {
        Integer idToPartition = idToPartition(projectionId);
        return this.session.prepare(new StringBuilder(113).append("SELECT projection_key, offset, manifest FROM ").append(keyspace()).append(".").append(table()).append(" WHERE projection_name = ? AND partition = ? AND projection_key = ?").toString()).map(preparedStatement -> {
            return preparedStatement.bind(new Object[]{projectionId.name(), idToPartition, projectionId.key()});
        }, this.executionContext).flatMap(boundStatement -> {
            return selectOne(boundStatement);
        }, this.executionContext).map(option -> {
            return option.map(row -> {
                return this.offsetSerialization.fromStorageRepresentation(row.getString("offset"), row.getString("manifest"));
            });
        }, this.executionContext);
    }

    public <Offset> Future<Done> saveOffset(ProjectionId projectionId, Offset offset) {
        Integer idToPartition = idToPartition(projectionId);
        if (offset instanceof MergeableOffset) {
            throw new IllegalArgumentException("The CassandraOffsetStore does not currently support MergeableOffset");
        }
        OffsetSerialization.SingleOffset storageRepresentation = this.offsetSerialization.toStorageRepresentation(projectionId, offset, this.offsetSerialization.toStorageRepresentation$default$3());
        if (storageRepresentation == null) {
            throw new MatchError(storageRepresentation);
        }
        OffsetSerialization.SingleOffset unapply = OffsetSerialization$SingleOffset$.MODULE$.unapply(storageRepresentation);
        unapply._1();
        String _2 = unapply._2();
        String _3 = unapply._3();
        unapply._4();
        Tuple2 apply = Tuple2$.MODULE$.apply(_2, _3);
        String str = (String) apply._1();
        String str2 = (String) apply._2();
        return this.session.prepare(new StringBuilder(116).append("INSERT INTO ").append(keyspace()).append(".").append(table()).append(" (projection_name, partition, projection_key, offset, manifest, last_updated) VALUES (?, ?, ?, ?, ?, ?)").toString()).map(preparedStatement -> {
            return preparedStatement.bind(new Object[]{projectionId.name(), idToPartition, projectionId.key(), str2, str, Instant.now(this.clock)});
        }, this.executionContext).flatMap(boundStatement -> {
            return execute(boundStatement);
        }, this.executionContext);
    }

    public Future<Done> clearOffset(ProjectionId projectionId) {
        return this.session.prepare(new StringBuilder(80).append("DELETE FROM ").append(keyspace()).append(".").append(table()).append(" WHERE projection_name = ? AND partition = ? AND projection_key = ?").toString()).map(preparedStatement -> {
            return preparedStatement.bind(new Object[]{projectionId.name(), idToPartition(projectionId), projectionId.key()});
        }, this.executionContext).flatMap(boundStatement -> {
            return execute(boundStatement);
        }, this.executionContext);
    }

    public Future<Option<ManagementState>> readManagementState(ProjectionId projectionId) {
        Integer idToPartition = idToPartition(projectionId);
        return this.session.prepare(new StringBuilder(87).append("SELECT paused FROM ").append(keyspace()).append(".").append(managementTable()).append(" WHERE projection_name = ? AND partition = ? AND projection_key = ?").toString()).map(preparedStatement -> {
            return preparedStatement.bind(new Object[]{projectionId.name(), idToPartition, projectionId.key()});
        }, this.executionContext).flatMap(boundStatement -> {
            return selectOne(boundStatement);
        }, this.executionContext).map(option -> {
            return option.map(row -> {
                return ManagementState$.MODULE$.apply(row.getBoolean("paused"));
            });
        }, this.executionContext);
    }

    public Future<Done> savePaused(ProjectionId projectionId, boolean z) {
        Integer idToPartition = idToPartition(projectionId);
        return this.session.prepare(new StringBuilder(103).append("INSERT INTO ").append(keyspace()).append(".").append(managementTable()).append(" (projection_name, partition, projection_key, paused, last_updated) VALUES (?, ?, ?, ?, ?)").toString()).map(preparedStatement -> {
            return preparedStatement.bind(new Object[]{projectionId.name(), idToPartition, projectionId.key(), Predef$.MODULE$.boolean2Boolean(z), Instant.now(this.clock)});
        }, this.executionContext).flatMap(boundStatement -> {
            return execute(boundStatement);
        }, this.executionContext);
    }

    public Future<Done> createKeyspaceAndTable() {
        return this.session.executeDDL(new StringBuilder(103).append("CREATE KEYSPACE IF NOT EXISTS ").append(keyspace()).append(" WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 }").toString()).flatMap(done -> {
            return this.session.executeDDL(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(296).append("\n        |CREATE TABLE IF NOT EXISTS ").append(keyspace()).append(".").append(table()).append(" (\n        |  projection_name text,\n        |  partition int,\n        |  projection_key text,\n        |  offset text,\n        |  manifest text,\n        |  last_updated timestamp,\n        |  PRIMARY KEY ((projection_name, partition), projection_key))\n        ").toString())).trim()).flatMap(done -> {
                return this.session.executeDDL(StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(273).append("\n        |CREATE TABLE IF NOT EXISTS ").append(keyspace()).append(".").append(managementTable()).append(" (\n        |  projection_name text,\n        |  partition int,\n        |  projection_key text,\n        |  paused boolean,\n        |  last_updated timestamp,\n        |  PRIMARY KEY ((projection_name, partition), projection_key))\n        ").toString())).trim()).map(done -> {
                    return Done$.MODULE$;
                }, this.executionContext);
            }, this.executionContext);
        }, this.executionContext);
    }

    public <Offset> Integer idToPartition(ProjectionId projectionId) {
        return Integer.valueOf(Math.abs(projectionId.key().hashCode() % this.cassandraPartitions));
    }
}
