package org.apache.pekko.persistence.cassandra;

import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.ClassicActorSystemProvider;
import org.apache.pekko.event.Logging$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.pattern.AskableActorRef$;
import org.apache.pekko.persistence.cassandra.Extractors;
import org.apache.pekko.persistence.cassandra.journal.CassandraJournalStatements;
import org.apache.pekko.persistence.cassandra.journal.CassandraTagRecovery;
import org.apache.pekko.persistence.cassandra.journal.JournalSettings;
import org.apache.pekko.persistence.cassandra.journal.TagWriters;
import org.apache.pekko.persistence.cassandra.journal.TagWriters$;
import org.apache.pekko.persistence.cassandra.journal.TagWriters$AllFlushed$;
import org.apache.pekko.persistence.cassandra.journal.TagWriters$FlushAllTagWriters$;
import org.apache.pekko.persistence.cassandra.journal.TagWriters$TagWritersSession$;
import org.apache.pekko.persistence.cassandra.journal.TaggedPreparedStatements;
import org.apache.pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal;
import org.apache.pekko.persistence.query.PersistenceQuery$;
import org.apache.pekko.stream.Materializer$;
import org.apache.pekko.stream.connectors.cassandra.scaladsl.CassandraSession;
import org.apache.pekko.stream.scaladsl.Sink$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.Timeout$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyVals;
import scala.runtime.LazyVals$;
import scala.runtime.LazyVals$Evaluating$;
import scala.runtime.LazyVals$NullValue$;
import scala.runtime.ScalaRunTime$;

/* compiled from: EventsByTagMigration.scala */
/* loaded from: input_file:org/apache/pekko/persistence/cassandra/EventsByTagMigration.class */
public class EventsByTagMigration {
    public static final long OFFSET$1 = LazyVals$.MODULE$.getOffsetStatic(EventsByTagMigration.class.getDeclaredField("session$lzy1"));
    public static final long OFFSET$0 = LazyVals$.MODULE$.getOffsetStatic(EventsByTagMigration.class.getDeclaredField("queries$lzy1"));
    private final String pluginConfigPath;
    private final ActorSystem system;
    private final LoggingAdapter log;
    private volatile Object queries$lzy1;
    private final ActorSystem sys;
    private final ExecutionContext ec;
    private final PluginSettings settings;
    private final CassandraJournalStatements journalStatements;
    private final ActorRef tagWriters;
    private final CassandraTagRecovery tagRecovery;
    private volatile Object session$lzy1;

    public static EventsByTagMigration apply(ClassicActorSystemProvider classicActorSystemProvider) {
        return EventsByTagMigration$.MODULE$.apply(classicActorSystemProvider);
    }

    public static Extractors.Extractor<Extractors.RawEvent> rawPayloadOldTagSchemaExtractor(BucketSize bucketSize, ClassicActorSystemProvider classicActorSystemProvider) {
        return EventsByTagMigration$.MODULE$.rawPayloadOldTagSchemaExtractor(bucketSize, classicActorSystemProvider);
    }

    public EventsByTagMigration(ClassicActorSystemProvider classicActorSystemProvider, String str) {
        this.pluginConfigPath = str;
        this.system = classicActorSystemProvider.classicSystem();
        this.log = Logging$.MODULE$.getLogger(this.system, EventsByTagMigration.class);
        this.sys = this.system;
        this.ec = this.system.dispatchers().lookup(this.system.settings().config().getString(new StringBuilder(26).append(str).append(".journal.plugin-dispatcher").toString()));
        this.settings = new PluginSettings(this.system, this.system.settings().config().getConfig(str));
        this.journalStatements = new CassandraJournalStatements(this.settings);
        TaggedPreparedStatements taggedPreparedStatements = new TaggedPreparedStatements(this.journalStatements, str2 -> {
            return session().prepare(str2);
        }, ec());
        this.tagWriters = this.system.actorOf(TagWriters$.MODULE$.props(eventsByTagSettings().tagWriterSettings(), TagWriters$TagWritersSession$.MODULE$.apply(session(), journalSettings().writeProfile(), journalSettings().readProfile(), taggedPreparedStatements)));
        this.tagRecovery = new CassandraTagRecovery(this.system, session(), this.settings, taggedPreparedStatements, this.tagWriters);
    }

    public LoggingAdapter log() {
        return this.log;
    }

    private CassandraReadJournal queries() {
        Object obj = this.queries$lzy1;
        if (obj instanceof CassandraReadJournal) {
            return (CassandraReadJournal) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (CassandraReadJournal) queries$lzyINIT1();
    }

    private Object queries$lzyINIT1() {
        while (true) {
            Object obj = this.queries$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$0, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        LazyVals$NullValue$ lazyVals$NullValue$2 = (CassandraReadJournal) PersistenceQuery$.MODULE$.apply(this.system).readJournalFor(new StringBuilder(6).append(this.pluginConfigPath).append(".query").toString());
                        if (lazyVals$NullValue$2 == null) {
                            lazyVals$NullValue$ = LazyVals$NullValue$.MODULE$;
                        } else {
                            lazyVals$NullValue$ = lazyVals$NullValue$2;
                        }
                        return lazyVals$NullValue$2;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$0, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.queries$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$0, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$0, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    public ExecutionContext ec() {
        return this.ec;
    }

    private JournalSettings journalSettings() {
        return this.settings.journalSettings();
    }

    private EventsByTagSettings eventsByTagSettings() {
        return this.settings.eventsByTagSettings();
    }

    public CassandraSession session() {
        Object obj = this.session$lzy1;
        if (obj instanceof CassandraSession) {
            return (CassandraSession) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (CassandraSession) session$lzyINIT1();
    }

    private Object session$lzyINIT1() {
        while (true) {
            Object obj = this.session$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$1, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        LazyVals$NullValue$ session = queries().session();
                        if (session == null) {
                            lazyVals$NullValue$ = LazyVals$NullValue$.MODULE$;
                        } else {
                            lazyVals$NullValue$ = session;
                        }
                        return session;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$1, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.session$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$1, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$1, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    public Future<Done> createTables() {
        log().info("Creating keyspace {} and new tag tables", journalSettings().keyspace());
        return session().executeWrite(this.journalStatements.createKeyspace(), ScalaRunTime$.MODULE$.wrapRefArray(new Object[0])).flatMap(done -> {
            return session().executeWrite(this.journalStatements.createTagsTable(), ScalaRunTime$.MODULE$.wrapRefArray(new Object[0])).flatMap(done -> {
                return session().executeWrite(this.journalStatements.createTagsProgressTable(), ScalaRunTime$.MODULE$.wrapRefArray(new Object[0])).flatMap(done -> {
                    return session().executeWrite(this.journalStatements.createTagScanningTable(), ScalaRunTime$.MODULE$.wrapRefArray(new Object[0])).map(done -> {
                        return Done$.MODULE$;
                    }, ec());
                }, ec());
            }, ec());
        }, ec());
    }

    public Future<Done> addTagsColumn() {
        log().info("Adding tags column to tabe {}", journalSettings().table());
        return session().executeWrite(new StringBuilder(32).append("ALTER TABLE ").append(journalSettings().keyspace()).append(".").append(journalSettings().table()).append(" ADD tags set<text>").toString(), ScalaRunTime$.MODULE$.wrapRefArray(new Object[0])).map(done -> {
            return Done$.MODULE$;
        }, ec());
    }

    private int periodicFlushBatchSize(int i) {
        return i == 0 ? eventsByTagSettings().tagWriterSettings().maxBatchSize() : i;
    }

    public Future<Done> migratePidsToTagViews(Seq<String> seq, int i, Timeout timeout) {
        return migrateToTagViewsInternal(Source$.MODULE$.fromIterator(() -> {
            return seq.iterator();
        }), i, timeout);
    }

    public int migratePidsToTagViews$default$2() {
        return 0;
    }

    public Timeout migratePidsToTagViews$default$3() {
        return Timeout$.MODULE$.apply(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
    }

    public Future<Done> migrateToTagViews(int i, Function1<String, Object> function1, Timeout timeout) {
        return migrateToTagViewsInternal((Source) queries().currentPersistenceIds().filter(function1), i, timeout);
    }

    public int migrateToTagViews$default$1() {
        return 0;
    }

    public Function1<String, Object> migrateToTagViews$default$2() {
        return str -> {
            return true;
        };
    }

    public Timeout migrateToTagViews$default$3() {
        return Timeout$.MODULE$.apply(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
    }

    private Future<Done> migrateToTagViewsInternal(Source<String, NotUsed> source, int i, Timeout timeout) {
        log().info("Beginning migration of data to tag_views table in keyspace {}", journalSettings().keyspace());
        return ((Future) source.map(str -> {
            log().info("Migrating the following persistence ids {}", str);
            return str;
        }).flatMapConcat(str2 -> {
            Future<Object> tagScanningStartingSequenceNr = this.tagRecovery.tagScanningStartingSequenceNr(str2);
            Future flatMap = this.tagRecovery.lookupTagProgress(str2, ec()).flatMap(map -> {
                return this.tagRecovery.setTagProgress(str2, map).flatMap(done -> {
                    return tagScanningStartingSequenceNr.map(obj -> {
                        return $anonfun$3$$anonfun$1$$anonfun$1(map, BoxesRunTime.unboxToLong(obj));
                    }, ec());
                }, ec());
            }, ec());
            int periodicFlushBatchSize = periodicFlushBatchSize(i);
            return Source$.MODULE$.futureSource(flatMap.map(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Map map2 = (Map) tuple2._1();
                long unboxToLong = BoxesRunTime.unboxToLong(tuple2._2());
                log().info("Starting migration for pid: {} based on progress: {} starting at sequence nr: {}", str2, map2, BoxesRunTime.boxToLong(unboxToLong));
                return queries().eventsByPersistenceId(str2, unboxToLong, Long.MAX_VALUE, Long.MAX_VALUE, None$.MODULE$, this.settings.querySettings().readProfile(), new StringBuilder(13).append("migrateToTag-").append(str2).toString(), EventsByTagMigration$.MODULE$.rawPayloadOldTagSchemaExtractor(eventsByTagSettings().bucketSize(), this.system), queries().eventsByPersistenceId$default$9()).map(rawEvent -> {
                    return this.tagRecovery.sendMissingTagWriteRaw(map2, false, rawEvent);
                }).grouped(periodicFlushBatchSize).mapAsync(1, seq -> {
                    return this.tagRecovery.flush(timeout);
                });
            }, ec()));
        }).runWith(Sink$.MODULE$.ignore(), Materializer$.MODULE$.matFromSystem(this.sys))).flatMap(done -> {
            ActorRef ask = org.apache.pekko.pattern.package$.MODULE$.ask(this.tagWriters);
            TagWriters.FlushAllTagWriters apply = TagWriters$FlushAllTagWriters$.MODULE$.apply(timeout);
            return AskableActorRef$.MODULE$.$qmark$extension(ask, apply, timeout, AskableActorRef$.MODULE$.$qmark$default$3$extension(ask, apply)).mapTo(ClassTag$.MODULE$.apply(TagWriters$AllFlushed$.class)).map(tagWriters$AllFlushed$ -> {
                return Done$.MODULE$;
            }, ec());
        }, ec());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final /* synthetic */ Tuple2 $anonfun$3$$anonfun$1$$anonfun$1(Map map, long j) {
        return Tuple2$.MODULE$.apply(map, BoxesRunTime.boxToLong(j));
    }
}
