package org.apache.pekko.persistence.cassandra.journal;

import com.datastax.oss.driver.api.core.cql.Row;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LogSource$;
import org.apache.pekko.event.Logging$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.pattern.AskableActorRef$;
import org.apache.pekko.persistence.cassandra.Extractors;
import org.apache.pekko.persistence.cassandra.PluginSettings;
import org.apache.pekko.persistence.cassandra.journal.TagWriter;
import org.apache.pekko.persistence.cassandra.journal.TagWriters;
import org.apache.pekko.serialization.Serialization;
import org.apache.pekko.serialization.SerializationExtension$;
import org.apache.pekko.stream.Materializer$;
import org.apache.pekko.stream.connectors.cassandra.scaladsl.CassandraSession;
import org.apache.pekko.stream.scaladsl.Sink$;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.Timeout$;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.collection.BuildFrom$;
import scala.collection.immutable.Map;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

/* compiled from: CassandraTagRecovery.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/persistence/cassandra/journal/CassandraTagRecovery.class */
public class CassandraTagRecovery {
    private final ActorSystem system;
    private final CassandraSession session;
    private final PluginSettings settings;
    private final TaggedPreparedStatements statements;
    private final ActorRef tagWriters;
    private final ActorSystem sys;
    private final LoggingAdapter log;
    private final Serialization serialization;
    private final Timeout timeout = Timeout$.MODULE$.apply(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(10)).second());

    public CassandraTagRecovery(ActorSystem actorSystem, CassandraSession cassandraSession, PluginSettings pluginSettings, TaggedPreparedStatements taggedPreparedStatements, ActorRef actorRef) {
        this.system = actorSystem;
        this.session = cassandraSession;
        this.settings = pluginSettings;
        this.statements = taggedPreparedStatements;
        this.tagWriters = actorRef;
        this.sys = actorSystem;
        this.log = Logging$.MODULE$.apply(actorSystem, CassandraTagRecovery.class, LogSource$.MODULE$.fromAnyClass());
        this.serialization = SerializationExtension$.MODULE$.apply(actorSystem);
    }

    public Future<Map<String, TagWriter.TagProgress>> lookupTagProgress(String str, ExecutionContext executionContext) {
        return this.statements.SelectTagProgressForPersistenceId().map(preparedStatement -> {
            return preparedStatement.bind(new Object[]{str}).setExecutionProfileName(this.settings.journalSettings().readProfile());
        }, executionContext).flatMap(boundStatement -> {
            return (Future) this.session.select(boundStatement).runWith(Sink$.MODULE$.seq(), Materializer$.MODULE$.matFromSystem(this.sys));
        }, executionContext).map(seq -> {
            return (Map) seq.foldLeft(Predef$.MODULE$.Map().empty(), (map, row) -> {
                return map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(row.getString("tag")), TagWriter$TagProgress$.MODULE$.apply(str, row.getLong("sequence_nr"), row.getLong("tag_pid_sequence_nr"))));
            });
        }, executionContext);
    }

    public Future<Object> tagScanningStartingSequenceNr(String str) {
        return this.statements.SelectTagScanningForPersistenceId().map(preparedStatement -> {
            return preparedStatement.bind(new Object[]{str}).setExecutionProfileName(this.settings.journalSettings().readProfile());
        }, this.statements.ec()).flatMap(statement -> {
            return this.session.selectOne(statement);
        }, this.statements.ec()).map(option -> {
            if (option instanceof Some) {
                return ((Row) ((Some) option).value()).getLong("sequence_nr");
            }
            if (None$.MODULE$.equals(option)) {
                return 1L;
            }
            throw new MatchError(option);
        }, this.statements.ec());
    }

    public Future<Extractors.TaggedPersistentRepr> sendMissingTagWrite(Map<String, TagWriter.TagProgress> map, Extractors.TaggedPersistentRepr taggedPersistentRepr) {
        if (taggedPersistentRepr.tags().isEmpty()) {
            return Future$.MODULE$.successful(taggedPersistentRepr);
        }
        return Future$.MODULE$.sequence(taggedPersistentRepr.tags().toList().map(str -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(str), org.apache.pekko.persistence.cassandra.package$.MODULE$.serializeEvent(taggedPersistentRepr.pr(), taggedPersistentRepr.tags(), taggedPersistentRepr.offset(), this.settings.eventsByTagSettings().bucketSize(), this.serialization, this.system, this.statements.ec()));
        }).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str2 = (String) tuple2._1();
            return ((Future) tuple2._2()).map(serialized -> {
                Some some = map.get(str2);
                if (None$.MODULE$.equals(some)) {
                    this.log.debug("[{}] Tag write not in progress. Sending to TagWriter. Tag [{}] Sequence Nr [{}]", taggedPersistentRepr.pr().persistenceId(), str2, BoxesRunTime.boxToLong(taggedPersistentRepr.sequenceNr()));
                    TagWriters.TagWrite apply = TagWriters$TagWrite$.MODULE$.apply(str2, scala.package$.MODULE$.Nil().$colon$colon(serialized), TagWriters$TagWrite$.MODULE$.$lessinit$greater$default$3());
                    this.tagWriters.$bang(apply, this.tagWriters.$bang$default$2(apply));
                    return Done$.MODULE$;
                }
                if (!(some instanceof Some)) {
                    throw new MatchError(some);
                }
                if (taggedPersistentRepr.sequenceNr() > ((TagWriter.TagProgress) some.value()).sequenceNr()) {
                    this.log.debug("[{}] Sequence nr > than write progress. Sending to TagWriter. Tag [{}] Sequence Nr [{}]", taggedPersistentRepr.pr().persistenceId(), str2, BoxesRunTime.boxToLong(taggedPersistentRepr.sequenceNr()));
                    TagWriters.TagWrite apply2 = TagWriters$TagWrite$.MODULE$.apply(str2, scala.package$.MODULE$.Nil().$colon$colon(serialized), TagWriters$TagWrite$.MODULE$.$lessinit$greater$default$3());
                    this.tagWriters.$bang(apply2, this.tagWriters.$bang$default$2(apply2));
                }
                return Done$.MODULE$;
            }, this.statements.ec());
        }), BuildFrom$.MODULE$.buildFromIterableOps(), this.statements.ec()).map(list -> {
            return taggedPersistentRepr;
        }, this.statements.ec());
    }

    public Extractors.RawEvent sendMissingTagWriteRaw(Map<String, TagWriter.TagProgress> map, boolean z, Extractors.RawEvent rawEvent) {
        rawEvent.serialized().tags().foreach(str -> {
            Some some = map.get(str);
            if (None$.MODULE$.equals(some)) {
                this.log.debug("[{}] Tag write not in progress. Sending to TagWriter. Tag [{}] seqNr [{}]", rawEvent.serialized().persistenceId(), str, BoxesRunTime.boxToLong(rawEvent.sequenceNr()));
                TagWriters.TagWrite apply = TagWriters$TagWrite$.MODULE$.apply(str, scala.package$.MODULE$.Nil().$colon$colon(rawEvent.serialized()), z);
                this.tagWriters.$bang(apply, this.tagWriters.$bang$default$2(apply));
            } else {
                if (!(some instanceof Some)) {
                    throw new MatchError(some);
                }
                if (rawEvent.sequenceNr() > ((TagWriter.TagProgress) some.value()).sequenceNr()) {
                    this.log.debug("[{}] seqNr > than write progress. Sending to TagWriter. Tag {} seqNr {}. ", rawEvent.serialized().persistenceId(), str, BoxesRunTime.boxToLong(rawEvent.sequenceNr()));
                    TagWriters.TagWrite apply2 = TagWriters$TagWrite$.MODULE$.apply(str, scala.package$.MODULE$.Nil().$colon$colon(rawEvent.serialized()), z);
                    this.tagWriters.$bang(apply2, this.tagWriters.$bang$default$2(apply2));
                }
            }
        });
        return rawEvent;
    }

    public boolean sendMissingTagWriteRaw$default$2() {
        return true;
    }

    public Future<Done> flush(Timeout timeout) {
        ActorRef ask = org.apache.pekko.pattern.package$.MODULE$.ask(this.tagWriters);
        TagWriters.FlushAllTagWriters apply = TagWriters$FlushAllTagWriters$.MODULE$.apply(timeout);
        return AskableActorRef$.MODULE$.$qmark$extension(ask, apply, Timeout$.MODULE$.apply(timeout.duration().$times(2L)), AskableActorRef$.MODULE$.$qmark$default$3$extension(ask, apply)).map(obj -> {
            return Done$.MODULE$;
        }, this.statements.ec());
    }

    public Future<Done> setTagProgress(String str, Map<String, TagWriter.TagProgress> map) {
        this.log.debug("[{}] Recovery sending tag progress: [{}]", str, map);
        ActorRef ask = org.apache.pekko.pattern.package$.MODULE$.ask(this.tagWriters);
        TagWriters.SetTagProgress apply = TagWriters$SetTagProgress$.MODULE$.apply(str, map);
        return AskableActorRef$.MODULE$.$qmark$extension(ask, apply, this.timeout, AskableActorRef$.MODULE$.$qmark$default$3$extension(ask, apply)).mapTo(ClassTag$.MODULE$.apply(TagWriters$TagProcessAck$.class)).map(tagWriters$TagProcessAck$ -> {
            return Done$.MODULE$;
        }, this.statements.ec());
    }

    public Future<Done> sendPersistentActorStarting(String str, ActorRef actorRef) {
        this.log.debug("[{}] Persistent actor starting [{}]", str, actorRef);
        ActorRef ask = org.apache.pekko.pattern.package$.MODULE$.ask(this.tagWriters);
        TagWriters.PersistentActorStarting apply = TagWriters$PersistentActorStarting$.MODULE$.apply(str, actorRef);
        return AskableActorRef$.MODULE$.$qmark$extension(ask, apply, this.timeout, AskableActorRef$.MODULE$.$qmark$default$3$extension(ask, apply)).mapTo(ClassTag$.MODULE$.apply(TagWriters$PersistentActorStartingAck$.class)).map(tagWriters$PersistentActorStartingAck$ -> {
            return Done$.MODULE$;
        }, this.statements.ec());
    }
}
