package org.apache.pekko.persistence.cassandra.query;

import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Scheduler;
import org.apache.pekko.cluster.pubsub.DistributedPubSub$;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator$Subscribe$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.persistence.cassandra.journal.TimeBucket;
import org.apache.pekko.persistence.cassandra.journal.TimeBucket$;
import org.apache.pekko.persistence.cassandra.query.EventsByTagStage;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import org.apache.pekko.util.FutureConverters$;
import org.apache.pekko.util.FutureConverters$CompletionStageOps$;
import org.apache.pekko.util.PrettyDuration$;
import org.apache.pekko.util.PrettyDuration$PrettyPrintableDuration$;
import org.apache.pekko.util.UUIDComparator$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple2$;
import scala.Tuple3;
import scala.Tuple3$;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyVals;
import scala.runtime.LazyVals$;
import scala.runtime.LazyVals$Evaluating$;
import scala.runtime.LazyVals$NullValue$;
import scala.runtime.RichLong;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;

/* compiled from: EventsByTagStage.scala */
/* loaded from: input_file:org/apache/pekko/persistence/cassandra/query/EventsByTagStage$$anon$1.class */
public final class EventsByTagStage$$anon$1 extends TimerGraphStageLogic implements StageLogging, OutHandler {
    public static final long OFFSET$1 = LazyVals$.MODULE$.getOffsetStatic(EventsByTagStage$$anon$1.class.getDeclaredField("scheduler$lzy1"));
    public static final long OFFSET$0 = LazyVals$.MODULE$.getOffsetStatic(EventsByTagStage$$anon$1.class.getDeclaredField("system$lzy1"));
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private EventsByTagStage.StageState stageState;
    private final long toOffsetMillis;
    private volatile Object system$lzy1;
    private volatile Object scheduler$lzy1;
    private final AsyncCallback newResultSetCb;
    private final AsyncCallback backTrackCb;
    private final long cleanupPersistenceIdsMills;
    private final /* synthetic */ EventsByTagStage $outer;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public EventsByTagStage$$anon$1(EventsByTagStage eventsByTagStage) {
        super(eventsByTagStage.m168shape());
        if (eventsByTagStage == null) {
            throw new NullPointerException();
        }
        this.$outer = eventsByTagStage;
        StageLogging.$init$(this);
        this.toOffsetMillis = BoxesRunTime.unboxToLong(eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.map(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$lessinit$greater$$anonfun$1).getOrElse(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$lessinit$greater$$anonfun$2));
        setHandler(eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out, this);
        this.newResultSetCb = getAsyncCallback(r8 -> {
            if (!(r8 instanceof Success)) {
                if (!(r8 instanceof Failure)) {
                    throw new MatchError(r8);
                }
                Throwable exception = ((Failure) r8).exception();
                log().warning("Cassandra query failed: {}", exception.getMessage());
                fail(eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out, exception);
                return;
            }
            AsyncResultSet asyncResultSet = (AsyncResultSet) ((Success) r8).value();
            EventsByTagStage.QueryState state = stageState().state();
            if (!(state instanceof EventsByTagStage.QueryInProgress)) {
                throw new IllegalStateException(new StringBuilder(39).append("New ResultSet when in unexpected state ").append(stageState().state()).toString());
            }
            if (((EventsByTagStage.QueryInProgress) state).abortForMissingSearch()) {
                lookForMissing();
            } else {
                updateStageState((v1) -> {
                    return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$lessinit$greater$$anonfun$3$$anonfun$1(r1, v1);
                });
                tryPushOne();
            }
        });
        this.backTrackCb = getAsyncCallback(r11 -> {
            if (r11 instanceof Failure) {
                Throwable exception = ((Failure) r11).exception();
                updateStageState(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$lessinit$greater$$anonfun$4$$anonfun$1);
                log().warning("Backtrack failed, this will retried. {}", exception);
                return;
            }
            if (!(r11 instanceof Success)) {
                throw new MatchError(r11);
            }
            Map map = (Map) ((Success) r11).value();
            updateStageState(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$lessinit$greater$$anonfun$4$$anonfun$2);
            log().debug("Current sequence nrs: {} from back tracking: {}", stageState().tagPidSequenceNrs(), map);
            Map map2 = (Map) map.filter(tuple2 -> {
                Tuple3 tuple3;
                if (tuple2 != null) {
                    Tuple2 tuple2 = (Tuple2) tuple2._2();
                    String str = (String) tuple2._1();
                    if (tuple2 != null) {
                        long unboxToLong = BoxesRunTime.unboxToLong(tuple2._1());
                        Some some = stageState().tagPidSequenceNrs().get(str);
                        return ((some instanceof Some) && (tuple3 = (Tuple3) some.value()) != null && BoxesRunTime.unboxToLong(tuple3._1()) < unboxToLong) || None$.MODULE$.equals(some);
                    }
                }
                throw new MatchError(tuple2);
            });
            if (!map2.nonEmpty() || stageState().isLookingForMissing()) {
                return;
            }
            log().debug("The following persistence ids have delayed events: {}. Initiating search for the events.", map2);
            EventsByTagStage.QueryState state = stageState().state();
            Nil$ _1 = state instanceof EventsByTagStage.BufferedEvents ? EventsByTagStage$BufferedEvents$.MODULE$.unapply((EventsByTagStage.BufferedEvents) state)._1() : scala.package$.MODULE$.Nil();
            Map map3 = (Map) map2.transform(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$_$$anonfun$2);
            Map map4 = (Map) map2.transform((str, tuple22) -> {
                Tuple2 apply = Tuple2$.MODULE$.apply(str, tuple22);
                if (apply != null) {
                    Tuple2 tuple22 = (Tuple2) apply._2();
                    String str = (String) apply._1();
                    if (tuple22 != null) {
                        long unboxToLong = BoxesRunTime.unboxToLong(tuple22._1());
                        return new RichLong(Predef$.MODULE$.longWrapper(BoxesRunTime.unboxToLong(stageState().tagPidSequenceNrs().get(str).map(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$_$$anonfun$4).getOrElse(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$_$$anonfun$5)) + 1)).to(BoxesRunTime.boxToLong(unboxToLong)).toSet();
                    }
                }
                throw new MatchError(apply);
            });
            UUID startOf = Uuids.startOf(BoxesRunTime.unboxToLong(map3.keys().foldLeft(BoxesRunTime.boxToLong(Uuids.unixTimestamp(stageState().toOffset())), (obj, obj2) -> {
                return $anonfun$6(BoxesRunTime.unboxToLong(obj), (String) obj2);
            })));
            log().debug("Starting search for: {}", map4);
            setLookingForMissingState(_1, startOf, stageState().toOffset(), map3, map4, true, eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventsByTagGapTimeout());
            long j = totalMissing();
            if (j > eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().maxMissingToSearch()) {
                failTooManyMissing(j);
                return;
            }
            EventsByTagStage.QueryState state2 = stageState().state();
            if (!(state2 instanceof EventsByTagStage.QueryInProgress)) {
                lookForMissing();
                return;
            }
            EventsByTagStage.QueryInProgress unapply = EventsByTagStage$QueryInProgress$.MODULE$.unapply((EventsByTagStage.QueryInProgress) state2);
            unapply._1();
            unapply._2();
            EventsByTagStage.QueryInProgress queryInProgress = (EventsByTagStage.QueryInProgress) state2;
            updateStageState((v1) -> {
                return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$lessinit$greater$$anonfun$4$$anonfun$3(r1, v1);
            });
        });
        this.cleanupPersistenceIdsMills = BoxesRunTime.unboxToLong(eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().cleanUpPersistenceIds().map(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$lessinit$greater$$anonfun$5).getOrElse(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$lessinit$greater$$anonfun$6));
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
        OutHandler.onDownstreamFinish$(this, th);
    }

    public Class logSource() {
        return EventsByTagStage.class;
    }

    public EventsByTagStage.StageState stageState() {
        return this.stageState;
    }

    public void stageState_$eq(EventsByTagStage.StageState stageState) {
        this.stageState = stageState;
    }

    public long toOffsetMillis() {
        return this.toOffsetMillis;
    }

    public ActorSystem system() {
        Object obj = this.system$lzy1;
        if (obj instanceof ActorSystem) {
            return (ActorSystem) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (ActorSystem) system$lzyINIT1();
    }

    private Object system$lzyINIT1() {
        while (true) {
            Object obj = this.system$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$0, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        LazyVals$NullValue$ system = materializer().system();
                        if (system == null) {
                            lazyVals$NullValue$ = LazyVals$NullValue$.MODULE$;
                        } else {
                            lazyVals$NullValue$ = system;
                        }
                        return system;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$0, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.system$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$0, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$0, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    public Scheduler scheduler() {
        Object obj = this.scheduler$lzy1;
        if (obj instanceof Scheduler) {
            return (Scheduler) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (Scheduler) scheduler$lzyINIT1();
    }

    private Object scheduler$lzyINIT1() {
        while (true) {
            Object obj = this.scheduler$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$1, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        LazyVals$NullValue$ scheduler = system().scheduler();
                        if (scheduler == null) {
                            lazyVals$NullValue$ = LazyVals$NullValue$.MODULE$;
                        } else {
                            lazyVals$NullValue$ = scheduler;
                        }
                        return scheduler;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$1, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.scheduler$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$1, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$1, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    private UUID calculateToOffset() {
        UUID uuid;
        long unixTimestamp = Uuids.unixTimestamp(Uuids.timeBased()) - this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency().toMillis();
        if (unixTimestamp < toOffsetMillis()) {
            UUID endOf = Uuids.endOf(unixTimestamp);
            if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug && log().isDebugEnabled()) {
                log().debug("[{}]: New toOffset (EC): {}", this.$outer.stageUuid(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(endOf));
            }
            uuid = endOf;
        } else {
            if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug && log().isDebugEnabled()) {
                log().debug("{}: New toOffset (End): {}", this.$outer.stageUuid(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset((UUID) this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.get()));
            }
            uuid = (UUID) this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.get();
        }
        return uuid;
    }

    private void updateToOffset() {
        updateStageState(stageState -> {
            return stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), calculateToOffset(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
        });
    }

    private void updateQueryState(EventsByTagStage.QueryState queryState) {
        updateStageState((v1) -> {
            return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$updateQueryState$$anonfun$1(r1, v1);
        });
    }

    public ExecutionContext ec() {
        return materializer().executionContext();
    }

    public AsyncCallback newResultSetCb() {
        return this.newResultSetCb;
    }

    public AsyncCallback backTrackCb() {
        return this.backTrackCb;
    }

    public void preStart() {
        stageState_$eq(EventsByTagStage$StageState$.MODULE$.apply(EventsByTagStage$QueryIdle$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset, calculateToOffset(), (Map) this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialTagPidSequenceNrs.transform(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$preStart$$anonfun$1), false, System.currentTimeMillis(), None$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$bucketSize));
        if (log().isInfoEnabled()) {
            log().info(new StringBuilder(85).append("[{}]: EventsByTag query [").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).append("] starting with EC delay {}ms: fromOffset [{}] toOffset [{}]").toString(), this.$outer.stageUuid(), BoxesRunTime.boxToLong(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency().toMillis()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.map(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$preStart$$anonfun$2));
        }
        log().debug("[{}] Starting with tag pid sequence nrs [{}]", this.$outer.stageUuid(), stageState().tagPidSequenceNrs());
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().pubsubNotification().isFinite()) {
            Try$.MODULE$.apply(() -> {
                preStart$$anonfun$3();
                return BoxedUnit.UNIT;
            });
        }
        query();
        Some some = this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$refreshInterval;
        if (some instanceof Some) {
            FiniteDuration finiteDuration = (FiniteDuration) some.value();
            scheduleQueryPoll(finiteDuration.$greater$eq(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).seconds()) ? finiteDuration.$div(2L).$plus(new package.DurationLong(scala.concurrent.duration.package$.MODULE$.DurationLong(ThreadLocalRandom.current().nextLong(finiteDuration.toMillis() / 2))).millis()) : finiteDuration, finiteDuration);
            log().debug("[{}] Scheduling query poll at: {} ms", this.$outer.stageUuid(), BoxesRunTime.boxToLong(finiteDuration.toMillis()));
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            log().debug("[{}] CurrentQuery: No query polling", this.$outer.stageUuid());
        }
        optionallySchedule$1(EventsByTagStage$PersistenceIdsCleanup$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().cleanUpPersistenceIds());
        optionallySchedule$1(EventsByTagStage$ScanForDelayedEvents$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().backtrack().interval());
    }

    private void scheduleQueryPoll(FiniteDuration finiteDuration, FiniteDuration finiteDuration2) {
        scheduleWithFixedDelay(EventsByTagStage$PeriodicQueryPoll$.MODULE$, finiteDuration, finiteDuration2);
    }

    public void onTimer(Object obj) {
        if (obj instanceof EventsByTagStage.QueryPoll) {
            m171continue();
        } else if (EventsByTagStage$PersistenceIdsCleanup$.MODULE$.equals(obj)) {
            cleanup();
        } else {
            if (!EventsByTagStage$ScanForDelayedEvents$.MODULE$.equals(obj)) {
                throw new IllegalStateException(new StringBuilder(21).append("Unexpected timerKey: ").append(obj).toString());
            }
            scanForDelayedEvents();
        }
    }

    public void onPull() {
        tryPushOne();
    }

    private void scanForDelayedEvents() {
        UUID uuid;
        if (stageState().delayedScanInProgress()) {
            return;
        }
        updateStageState(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$scanForDelayedEvents$$anonfun$1);
        long key = stageState().currentTimeBucket().previous(1).key();
        long unixTimestamp = Uuids.unixTimestamp(stageState().fromOffset());
        if (System.currentTimeMillis() - stageState().previousLongDelayedScan() > this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$backtracking.longIntervalMillis()) {
            UUID startOf = Uuids.startOf(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$backtracking.longPeriodMillis(unixTimestamp, key));
            log().debug("Initialising long period back track from {}", org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(startOf));
            updateStageState(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$_$$anonfun$7);
            uuid = startOf;
        } else {
            UUID startOf2 = Uuids.startOf(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$backtracking.periodMillis(unixTimestamp, key));
            if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug) {
                log().debug("Initialising period back track from {}", org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(startOf2));
            }
            uuid = startOf2;
        }
        UUID uuid2 = uuid;
        this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$scanner.scan(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), UUIDComparator$.MODULE$.comparator().compare(uuid2, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset) < 0 ? this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset : uuid2, stageState().fromOffset(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$bucketSize, Duration$.MODULE$.Zero(), EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$scanForDelayedEvents$$anonfun$2).onComplete(r4 -> {
            backTrackCb().invoke(r4);
        }, ec());
    }

    private void cleanup() {
        long currentTimeMillis = System.currentTimeMillis();
        Map map = (Map) stageState().tagPidSequenceNrs().filterNot(tuple2 -> {
            Tuple3 tuple3;
            if (tuple2 == null || (tuple3 = (Tuple3) tuple2._2()) == null) {
                throw new MatchError(tuple2);
            }
            return currentTimeMillis - BoxesRunTime.unboxToLong(tuple3._3()) > this.cleanupPersistenceIdsMills;
        });
        updateStageState((v1) -> {
            return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$cleanup$$anonfun$1(r1, v1);
        });
    }

    /* renamed from: continue, reason: not valid java name */
    private void m171continue() {
        EventsByTagStage.QueryState state = stageState().state();
        if (EventsByTagStage$QueryIdle$.MODULE$.equals(state)) {
            if (stageState().isLookingForMissing()) {
                lookForMissing();
                return;
            } else {
                query();
                return;
            }
        }
        if ((state instanceof EventsByTagStage.QueryResult) || (state instanceof EventsByTagStage.BufferedEvents)) {
            tryPushOne();
        } else if (!(state instanceof EventsByTagStage.QueryInProgress)) {
            throw new MatchError(state);
        }
    }

    private void query() {
        updateToOffset();
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug && log().isDebugEnabled()) {
            log().debug("[{}] Executing query: timeBucket: {} from offset: {} to offset: {}", this.$outer.stageUuid(), stageState().currentTimeBucket(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().fromOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().toOffset()));
        }
        updateStageState(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$query$$anonfun$1);
        this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.selectEventsForBucket(stageState().currentTimeBucket(), stageState().fromOffset(), stageState().toOffset(), (obj, obj2, obj3) -> {
            query$$anonfun$2(BoxesRunTime.unboxToInt(obj), (Throwable) obj2, (FiniteDuration) obj3);
            return BoxedUnit.UNIT;
        }, ec(), scheduler()).onComplete(r4 -> {
            newResultSetCb().invoke(r4);
        }, ec());
    }

    private EventsByTagStage.LookingForMissing getMissingLookup() {
        Some missingLookup = stageState().missingLookup();
        if (missingLookup instanceof Some) {
            return (EventsByTagStage.LookingForMissing) missingLookup.value();
        }
        if (None$.MODULE$.equals(missingLookup)) {
            throw new IllegalStateException(new StringBuilder(90).append("lookingForMissingCalled for tag ").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).append(" when there ").append("is no missing. Raise a bug with debug logging.").toString());
        }
        throw new MatchError(missingLookup);
    }

    private void lookForMissing() {
        EventsByTagStage.LookingForMissing missingLookup = getMissingLookup();
        if (missingLookup.deadline().isOverdue()) {
            abortMissingSearch(missingLookup);
            return;
        }
        updateQueryState(EventsByTagStage$QueryInProgress$.MODULE$.apply(false, EventsByTagStage$QueryInProgress$.MODULE$.$lessinit$greater$default$2()));
        if (log().isDebugEnabled()) {
            log().debug(new StringBuilder(3).append("[").append(this.$outer.stageUuid()).append("] ").append(new StringBuilder(66).append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).append(": Executing query to look for {}. Timebucket: {}. From: {}. To: {}").toString()).toString(), missingLookup.gapDetected() ? "missing" : "previous events", missingLookup.bucket(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(missingLookup.minOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(missingLookup.maxOffset()));
        }
        this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.selectEventsForBucket(missingLookup.bucket(), missingLookup.minOffset(), missingLookup.maxOffset(), (obj, obj2, obj3) -> {
            lookForMissing$$anonfun$1(BoxesRunTime.unboxToInt(obj), (Throwable) obj2, (FiniteDuration) obj3);
            return BoxedUnit.UNIT;
        }, ec(), scheduler()).onComplete(r4 -> {
            newResultSetCb().invoke(r4);
        }, ec());
    }

    private void abortMissingSearch(EventsByTagStage.LookingForMissing lookingForMissing) {
        if (lookingForMissing.gapDetected()) {
            fail(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out, new MissingTaggedEventException(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), lookingForMissing.remainingMissing(), lookingForMissing.minOffset(), lookingForMissing.maxOffset()));
            return;
        }
        log().debug("[{}] [{}]: Finished scanning for older events for persistence ids [{}]", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), lookingForMissing.remainingMissing().keys().mkString(","));
        stopLookingForMissing(lookingForMissing, lookingForMissing.buffered());
        tryPushOne();
    }

    public void checkResultSetForMissing(AsyncResultSet asyncResultSet, EventsByTagStage.LookingForMissing lookingForMissing) {
        Row row = (Row) asyncResultSet.one();
        String string = row.getString("persistence_id");
        long j = row.getLong("tag_pid_sequence_nr");
        Some some = lookingForMissing.remainingMissing().get(string);
        if (some instanceof Some) {
            Set set = (Set) some.value();
            if (set.contains(BoxesRunTime.boxToLong(j))) {
                EventsByTagStage.UUIDRow extractUuidRow = extractUuidRow(row);
                Set $minus = set.$minus(BoxesRunTime.boxToLong(extractUuidRow.tagPidSequenceNr()));
                Map updated = $minus.isEmpty() ? (Map) lookingForMissing.remainingMissing().$minus(string) : lookingForMissing.remainingMissing().updated(string, $minus);
                if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug) {
                    log().debug("[{}] {}: Found a missing event, sequence nr {}. Remaining missing: {}", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), BoxesRunTime.boxToLong(extractUuidRow.tagPidSequenceNr()), updated);
                }
                if (updated.isEmpty()) {
                    stopLookingForMissing(lookingForMissing, lookingForMissing.buffered().$colon$colon(extractUuidRow));
                    return;
                }
                log().debug("[{}] [{}]: There are more missing events. [{}]", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), updated);
                EventsByTagStage.StageState stageState = stageState();
                stageState_$eq(stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState().missingLookup().map((v2) -> {
                    return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$_$$anonfun$9(r2, r3, v2);
                }), stageState.copy$default$8()));
            }
        }
    }

    private void updateStageState(Function1 function1) {
        stageState_$eq((EventsByTagStage.StageState) function1.apply(stageState()));
    }

    public boolean handleFirstTimePersistenceId(EventsByTagStage.UUIDRow uUIDRow) {
        UUID uuid;
        if (uUIDRow.tagPidSequenceNr() == 1) {
            updateStageState((v1) -> {
                return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$handleFirstTimePersistenceId$$anonfun$1(r1, v1);
            });
            push(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out, uUIDRow);
            return false;
        }
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$usingOffset) {
            if (!stageState().currentTimeBucket().inPast()) {
                FiniteDuration newPersistenceIdScanTimeout = this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().newPersistenceIdScanTimeout();
                FiniteDuration Zero = Duration$.MODULE$.Zero();
                if (newPersistenceIdScanTimeout != null) {
                }
            }
            log().debug("[{}] New persistence id: {}. Timebucket: {}. Tag pid sequence nr: {}", this.$outer.stageUuid(), uUIDRow.persistenceId(), stageState().currentTimeBucket(), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()));
            updateStageState((v1) -> {
                return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$handleFirstTimePersistenceId$$anonfun$2(r1, v1);
            });
            push(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out, uUIDRow);
            return false;
        }
        if (log().isDebugEnabled()) {
            log().debug(new StringBuilder(326).append("[").append(this.$outer.stageUuid()).append("] ").append(" [{}]: Persistence Id not in metadata: [{}] does not start at tag pid sequence nr 1. ").append("This could either be that the events are before the offset, that the metadata has been dropped or that they are delayed. ").append("Tag pid sequence nr found: [{}]. Looking for lower tag pid sequence nrs for [{}] in the current and previous buckets.").toString(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), uUIDRow.persistenceId(), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()), PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().newPersistenceIdScanTimeout())));
        }
        UUID startOf = Uuids.startOf(stageState().currentTimeBucket().previous(1).key());
        if (UUIDComparator$.MODULE$.comparator().compare(startOf, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset) < 0) {
            log().debug("[{}] Starting at fromOffset", this.$outer.stageUuid());
            uuid = this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset;
        } else {
            log().debug("[{}] Starting at startOfBucket", this.$outer.stageUuid());
            uuid = startOf;
        }
        setLookingForMissingState(scala.package$.MODULE$.Nil().$colon$colon(uUIDRow), uuid, uUIDRow.offset(), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(uUIDRow.persistenceId()), EventsByTagStage$MissingData$.MODULE$.apply(uUIDRow.offset(), uUIDRow.tagPidSequenceNr()))})), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(uUIDRow.persistenceId()), new RichLong(Predef$.MODULE$.longWrapper(1L)).until(BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr())).toSet())})), false, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().newPersistenceIdScanTimeout());
        return true;
    }

    public boolean handleExistingPersistenceId(EventsByTagStage.UUIDRow uUIDRow, long j, UUID uuid) {
        long j2 = j + 1;
        String persistenceId = uUIDRow.persistenceId();
        if (uUIDRow.tagPidSequenceNr() < j2) {
            log().warning(new StringBuilder(117).append("[").append(this.$outer.stageUuid()).append("] ").append("Duplicate sequence number. Persistence id: {}. Tag: {}. Expected sequence nr: {}. ").append("Actual {}. This will be dropped.").toString(), persistenceId, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), BoxesRunTime.boxToLong(j2), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()));
            return false;
        }
        if (uUIDRow.tagPidSequenceNr() > j2) {
            log().info(new StringBuilder(82).append("[").append(this.$outer.stageUuid()).append("] ").append("{}: Missing event for persistence id: {}. Expected sequence nr: {}, actual: {}.").toString(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), persistenceId, BoxesRunTime.boxToLong(j2), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()));
            setLookingForMissingState(scala.package$.MODULE$.Nil().$colon$colon(uUIDRow), uuid, uUIDRow.offset(), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(uUIDRow.persistenceId()), EventsByTagStage$MissingData$.MODULE$.apply(uUIDRow.offset(), uUIDRow.tagPidSequenceNr()))})), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(uUIDRow.persistenceId()), new RichLong(Predef$.MODULE$.longWrapper(j2)).until(BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr())).toSet())})), true, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventsByTagGapTimeout());
            return true;
        }
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug) {
            log().debug(new StringBuilder(66).append("[").append(this.$outer.stageUuid()).append("] ").append(" Updating offset to {} from pId {} seqNr {} tagPidSequenceNr {}").toString(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(uUIDRow.offset()), persistenceId, BoxesRunTime.boxToLong(uUIDRow.sequenceNr()), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()));
        }
        updateStageState((v2) -> {
            return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$handleExistingPersistenceId$$anonfun$1(r1, r2, v2);
        });
        push(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out, uUIDRow);
        return false;
    }

    private void setLookingForMissingState(List list, UUID uuid, UUID uuid2, Map map, Map map2, boolean z, FiniteDuration finiteDuration) {
        TimeBucket apply = TimeBucket$.MODULE$.apply(uuid2, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$bucketSize);
        boolean shouldSearchInPreviousBucket = shouldSearchInPreviousBucket(apply, uuid);
        updateStageState((v9) -> {
            return EventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$setLookingForMissingState$$anonfun$1(r1, r2, r3, r4, r5, r6, r7, r8, r9, v9);
        });
    }

    private long totalMissing() {
        return BoxesRunTime.unboxToLong(getMissingLookup().remainingMissing().values().foldLeft(BoxesRunTime.boxToLong(0L), EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$totalMissing$$anonfun$adapted$1));
    }

    private void failTooManyMissing(long j) {
        failStage(new RuntimeException(new StringBuilder(57).append(j).append(" missing tagged events for tag [").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).append("]. Failing without search").toString()));
    }

    /* JADX WARN: Code restructure failed: missing block: B:25:0x021f, code lost:
    
        push(r7.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out, r0);
        updateQueryState(org.apache.pekko.persistence.cassandra.query.EventsByTagStage$QueryIdle$.MODULE$);
        query();
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x0237, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x01d7, code lost:
    
        m171continue();
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x01db, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:77:0x0108, code lost:
    
        throw new scala.MatchError(r0);
     */
    /* JADX WARN: Removed duplicated region for block: B:17:0x01e4  */
    /* JADX WARN: Removed duplicated region for block: B:31:0x025c A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void tryPushOne() {
        /*
            Method dump skipped, instructions count: 666
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pekko.persistence.cassandra.query.EventsByTagStage$$anon$1.tryPushOne():void");
    }

    private boolean shouldSearchInPreviousBucket(TimeBucket timeBucket, UUID uuid) {
        return !timeBucket.within(uuid);
    }

    private void queryExhausted() {
        updateQueryState(EventsByTagStage$QueryIdle$.MODULE$);
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug && log().isDebugEnabled()) {
            log().debug("[{}] Query exhausted, next query: from: {} to: {}", this.$outer.stageUuid(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().fromOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().toOffset()));
        }
        if (!stageState().isLookingForMissing()) {
            if (stageState().shouldMoveBucket()) {
                nextTimeBucket();
                log().debug("[{}] Moving to next bucket: {}", this.$outer.stageUuid(), stageState().currentTimeBucket());
                query();
                return;
            } else {
                if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.contains(stageState().toOffset())) {
                    log().debug("[{}] Current query finished and has passed the eventual consistency delay, ending.", this.$outer.stageUuid());
                    completeStage();
                    return;
                }
                if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug) {
                    log().debug("[{}] Nothing todo, waiting for next poll", this.$outer.stageUuid());
                }
                if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$isLiveQuery()) {
                    return;
                }
                log().debug("[{}] Scheduling poll for eventual consistency delay: {}", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency());
                scheduleOnce(EventsByTagStage$OneOffQueryPoll$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency());
                return;
            }
        }
        EventsByTagStage.LookingForMissing lookingForMissing = (EventsByTagStage.LookingForMissing) stageState().missingLookup().get();
        if (lookingForMissing.queryPrevious()) {
            log().debug("[{}] [{}]: Missing could be in previous bucket. Querying right away.", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag());
            updateStageState(stageState -> {
                return stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState().missingLookup().map(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$_$$anonfun$10), stageState.copy$default$8());
            });
            lookForMissing();
            return;
        }
        FiniteDuration timeLeft = lookingForMissing.deadline().timeLeft();
        if (timeLeft.$less$eq(Duration$.MODULE$.Zero())) {
            abortMissingSearch(lookingForMissing);
            return;
        }
        log().debug(new StringBuilder(63).append("[").append(this.$outer.stageUuid()).append("] [{}]: Still looking for {}. {}. Duration left for search: {}").toString(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), lookingForMissing.gapDetected() ? "missing" : "previous events", stageState(), PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(timeLeft)));
        updateStageState(stageState2 -> {
            return stageState2.copy(stageState2.copy$default$1(), stageState2.copy$default$2(), stageState2.copy$default$3(), stageState2.copy$default$4(), stageState2.copy$default$5(), stageState2.copy$default$6(), stageState().missingLookup().map(lookingForMissing2 -> {
                TimeBucket apply = TimeBucket$.MODULE$.apply(lookingForMissing2.maxOffset(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$bucketSize);
                return lookingForMissing2.copy(lookingForMissing2.copy$default$1(), lookingForMissing2.copy$default$2(), lookingForMissing2.copy$default$3(), apply, shouldSearchInPreviousBucket(apply, lookingForMissing2.minOffset()), lookingForMissing2.copy$default$6(), lookingForMissing2.copy$default$7(), lookingForMissing2.copy$default$8(), lookingForMissing2.copy$default$9());
            }), stageState2.copy$default$8());
        });
        if (timeLeft.$less(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.querySettings().refreshInterval())) {
            log().debug("Scheduling one off poll in {}", PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(timeLeft)));
            scheduleOnce(EventsByTagStage$OneOffQueryPoll$.MODULE$, timeLeft);
        } else {
            if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$isLiveQuery()) {
                return;
            }
            log().debug("Scheduling one off poll in {}", this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.querySettings().refreshInterval());
            scheduleOnce(EventsByTagStage$OneOffQueryPoll$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.querySettings().refreshInterval());
        }
    }

    private void stopLookingForMissing(EventsByTagStage.LookingForMissing lookingForMissing, List list) {
        updateQueryState(list.isEmpty() ? EventsByTagStage$QueryIdle$.MODULE$ : EventsByTagStage$BufferedEvents$.MODULE$.apply((List) list.sorted(EventsByTagStage$.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$$uuidRowOrdering)));
        log().debug("[{}] Search over. Buffered events ready for delivery: {}", this.$outer.stageUuid(), stageState().state());
        updateStageState(stageState -> {
            Map<String, EventsByTagStage.MissingData> missingData = lookingForMissing.missingData();
            None$ none$ = None$.MODULE$;
            return (EventsByTagStage.StageState) missingData.foldLeft(stageState.copy(stageState.copy$default$1(), lookingForMissing.maxOffset(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), none$, stageState.copy$default$8()), (stageState, tuple2) -> {
                Tuple2 apply = Tuple2$.MODULE$.apply(stageState, tuple2);
                if (apply != null) {
                    Tuple2 tuple2 = (Tuple2) apply._2();
                    EventsByTagStage.StageState stageState = (EventsByTagStage.StageState) apply._1();
                    if (tuple2 != null) {
                        String str = (String) tuple2._1();
                        EventsByTagStage.MissingData missingData2 = (EventsByTagStage.MissingData) tuple2._2();
                        log().debug("Updating tag pid sequence nr for pid {} to {}", str, BoxesRunTime.boxToLong(missingData2.maxSequenceNr()));
                        return stageState.tagPidSequenceNumberUpdate(str, Tuple3$.MODULE$.apply(BoxesRunTime.boxToLong(missingData2.maxSequenceNr()), missingData2.maxOffset(), BoxesRunTime.boxToLong(System.currentTimeMillis())));
                    }
                }
                throw new MatchError(apply);
            });
        });
    }

    private void fetchMore(AsyncResultSet asyncResultSet) {
        log().debug("[{}] No more results without paging. Requesting more.", this.$outer.stageUuid());
        Future asScala$extension = FutureConverters$CompletionStageOps$.MODULE$.asScala$extension(FutureConverters$.MODULE$.CompletionStageOps(asyncResultSet.fetchNextPage()));
        updateQueryState(EventsByTagStage$QueryInProgress$.MODULE$.apply(false, EventsByTagStage$QueryInProgress$.MODULE$.$lessinit$greater$default$2()));
        asScala$extension.onComplete(r4 -> {
            newResultSetCb().invoke(r4);
        }, ec());
    }

    private EventsByTagStage.UUIDRow extractUuidRow(Row row) {
        return EventsByTagStage$UUIDRow$.MODULE$.apply(row.getString("persistence_id"), row.getLong("sequence_nr"), row.getUuid("timestamp"), row.getLong("tag_pid_sequence_nr"), row);
    }

    private void nextTimeBucket() {
        updateStageState(stageState -> {
            return stageState.copy(stageState.copy$default$1(), Uuids.startOf(stageState().currentTimeBucket().next().key()), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
        });
        updateToOffset();
    }

    private final long $anonfun$6$$anonfun$2() {
        return stageState().currentTimeBucket().previous(1).key();
    }

    private final /* synthetic */ long $anonfun$6(long j, String str) {
        Tuple2 apply = Tuple2$.MODULE$.apply(BoxesRunTime.boxToLong(j), str);
        if (apply == null) {
            throw new MatchError(apply);
        }
        return scala.math.package$.MODULE$.min(BoxesRunTime.unboxToLong(stageState().tagPidSequenceNrs().get((String) apply._2()).map(EventsByTagStage::org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$anon$1$$_$$anonfun$6$$anonfun$1).getOrElse(this::$anonfun$6$$anonfun$2)), BoxesRunTime.unboxToLong(apply._1()));
    }

    private final void preStart$$anonfun$3() {
        getStageActor(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            if (tuple2._2().equals(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag())) {
                log().debug("[{}] Received pub sub tag update for our tag, initiating query", this.$outer.stageUuid());
                FiniteDuration eventualConsistency = this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency();
                FiniteDuration Zero = Duration$.MODULE$.Zero();
                if (eventualConsistency != null ? eventualConsistency.equals(Zero) : Zero == null) {
                    m171continue();
                } else if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency().$less(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.querySettings().refreshInterval())) {
                    scheduleOnce(EventsByTagStage$TagNotification$.MODULE$.apply(System.currentTimeMillis() / 10), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency());
                }
            }
        });
        ActorRef mediator = DistributedPubSub$.MODULE$.apply(system()).mediator();
        DistributedPubSubMediator.Subscribe apply = DistributedPubSubMediator$Subscribe$.MODULE$.apply(new StringBuilder(9).append("apc.tags.").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).toString(), stageActor().ref());
        mediator.$bang(apply, mediator.$bang$default$2(apply));
    }

    private final void optionallySchedule$1(Object obj, Option option) {
        option.foreach(finiteDuration -> {
            scheduleWithFixedDelay(obj, finiteDuration, finiteDuration);
        });
    }

    private final /* synthetic */ void query$$anonfun$2(int i, Throwable th, FiniteDuration finiteDuration) {
        if (log().isWarningEnabled()) {
            log().warning(new StringBuilder(100).append("[{}] Query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ").append(i).append(". Next retry in: ").append(PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(finiteDuration))).append(". Reason: ").append(th.getMessage()).toString(), this.$outer.stageUuid(), stageState().currentTimeBucket(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().fromOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().toOffset()));
        }
    }

    private final /* synthetic */ void lookForMissing$$anonfun$1(int i, Throwable th, FiniteDuration finiteDuration) {
        if (log().isWarningEnabled()) {
            log().warning(new StringBuilder(120).append("[{}] Looking for missing query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ").append(i).append(". Next retry in: ").append(PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(finiteDuration))).append(". Reason: ").append(th.getMessage()).toString(), this.$outer.stageUuid(), stageState().currentTimeBucket(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().fromOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().toOffset()));
        }
    }
}
