package org.apache.pekko.persistence.cassandra.reconciler;

import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.ExtendedActorSystem;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LogSource$;
import org.apache.pekko.event.Logging$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.persistence.cassandra.Extractors$;
import org.apache.pekko.persistence.cassandra.PluginSettings;
import org.apache.pekko.persistence.cassandra.journal.CassandraTagRecovery;
import org.apache.pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal;
import org.apache.pekko.persistence.query.PersistenceQuery$;
import org.apache.pekko.serialization.Serialization;
import org.apache.pekko.serialization.SerializationExtension$;
import org.apache.pekko.stream.Materializer$;
import org.apache.pekko.stream.OverflowStrategy$;
import org.apache.pekko.stream.scaladsl.Sink$;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.Timeout$;
import scala.None$;
import scala.concurrent.Future;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;

/* compiled from: BuildTagViewForPersistenceId.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/persistence/cassandra/reconciler/BuildTagViewForPersisetceId.class */
public final class BuildTagViewForPersisetceId {
    private final String persistenceId;
    private final ActorSystem system;
    private final CassandraTagRecovery recovery;
    private final PluginSettings settings;
    private final ActorSystem sys;
    private final LoggingAdapter log;
    private final Serialization serialization;
    private final CassandraReadJournal queries;
    private final Timeout flushTimeout = Timeout$.MODULE$.apply(new package.DurationInt(package$.MODULE$.DurationInt(30)).seconds());

    public BuildTagViewForPersisetceId(String str, ActorSystem actorSystem, CassandraTagRecovery cassandraTagRecovery, PluginSettings pluginSettings) {
        this.persistenceId = str;
        this.system = actorSystem;
        this.recovery = cassandraTagRecovery;
        this.settings = pluginSettings;
        this.sys = actorSystem;
        this.log = Logging$.MODULE$.apply(actorSystem, BuildTagViewForPersisetceId.class, LogSource$.MODULE$.fromAnyClass());
        this.serialization = SerializationExtension$.MODULE$.apply(actorSystem);
        this.queries = PersistenceQuery$.MODULE$.apply((ExtendedActorSystem) actorSystem).readJournalFor("pekko.persistence.cassandra.query");
    }

    public Future<Done> reconcile(int i) {
        return (Future) Source$.MODULE$.futureSource(this.recovery.lookupTagProgress(this.persistenceId, this.system.dispatcher()).flatMap(map -> {
            return this.recovery.setTagProgress(this.persistenceId, map).map(done -> {
                return map;
            }, this.system.dispatcher());
        }, this.system.dispatcher()).map(map2 -> {
            this.log.debug("[{}] Rebuilding tag view table from: [{}]", this.persistenceId, map2);
            return this.queries.eventsByPersistenceId(this.persistenceId, 0L, Long.MAX_VALUE, Long.MAX_VALUE, None$.MODULE$, this.settings.journalSettings().readProfile(), "BuildTagViewForPersistenceId", Extractors$.MODULE$.rawEvent(this.settings.eventsByTagSettings().bucketSize(), this.serialization, this.system), this.queries.eventsByPersistenceId$default$9()).map(rawEvent -> {
                return this.recovery.sendMissingTagWriteRaw(map2, false, rawEvent);
            }).buffer(i, OverflowStrategy$.MODULE$.backpressure()).mapAsync(1, rawEvent2 -> {
                return this.recovery.flush(this.flushTimeout);
            });
        }, this.system.dispatcher())).runWith(Sink$.MODULE$.ignore(), Materializer$.MODULE$.matFromSystem(this.sys));
    }

    public int reconcile$default$1() {
        return 1000;
    }
}
