package org.apache.pekko.persistence.cassandra.query;

import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Scheduler;
import org.apache.pekko.cluster.pubsub.DistributedPubSub$;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator$Subscribe$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.persistence.cassandra.journal.TimeBucket;
import org.apache.pekko.persistence.cassandra.journal.TimeBucket$;
import org.apache.pekko.persistence.cassandra.query.EventsByTagStage;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import org.apache.pekko.util.FutureConverters$;
import org.apache.pekko.util.FutureConverters$CompletionStageOps$;
import org.apache.pekko.util.PrettyDuration$;
import org.apache.pekko.util.PrettyDuration$PrettyPrintableDuration$;
import org.apache.pekko.util.UUIDComparator$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline$;
import scala.concurrent.duration.Duration$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: EventsByTagStage.scala */
/* loaded from: input_file:org/apache/pekko/persistence/cassandra/query/EventsByTagStage$$anon$2.class */
public final class EventsByTagStage$$anon$2 extends TimerGraphStageLogic implements StageLogging, OutHandler {
    private ActorSystem system;
    private Scheduler scheduler;
    private EventsByTagStage.StageState stageState;
    private final long toOffsetMillis;
    private final AsyncCallback<Try<AsyncResultSet>> newResultSetCb;
    private final AsyncCallback<Try<Map<String, Tuple2<Object, UUID>>>> backTrackCb;
    private final long cleanupPersistenceIdsMills;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private volatile byte bitmap$0;
    private final /* synthetic */ EventsByTagStage $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public void onDownstreamFinish(Throwable th) throws Exception {
        OutHandler.onDownstreamFinish$(this, th);
    }

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public Class<EventsByTagStage> logSource() {
        return EventsByTagStage.class;
    }

    private EventsByTagStage.StageState stageState() {
        return this.stageState;
    }

    private void stageState_$eq(EventsByTagStage.StageState stageState) {
        this.stageState = stageState;
    }

    private long toOffsetMillis() {
        return this.toOffsetMillis;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.pekko.persistence.cassandra.query.EventsByTagStage$$anon$2] */
    private ActorSystem system$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.system = materializer().system();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.system;
    }

    private ActorSystem system() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? system$lzycompute() : this.system;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.pekko.persistence.cassandra.query.EventsByTagStage$$anon$2] */
    private Scheduler scheduler$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.scheduler = system().scheduler();
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.scheduler;
    }

    private Scheduler scheduler() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? scheduler$lzycompute() : this.scheduler;
    }

    private UUID calculateToOffset() {
        UUID uuid;
        long unixTimestamp = Uuids.unixTimestamp(Uuids.timeBased()) - this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency().toMillis();
        if (unixTimestamp < toOffsetMillis()) {
            UUID endOf = Uuids.endOf(unixTimestamp);
            if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug() && log().isDebugEnabled()) {
                log().debug("[{}]: New toOffset (EC): {}", this.$outer.stageUuid(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(endOf));
            }
            uuid = endOf;
        } else {
            if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug() && log().isDebugEnabled()) {
                log().debug("{}: New toOffset (End): {}", this.$outer.stageUuid(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset((UUID) this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.get()));
            }
            uuid = (UUID) this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.get();
        }
        return uuid;
    }

    private void updateToOffset() {
        updateStageState(stageState -> {
            return stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), this.calculateToOffset(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
        });
    }

    private void updateQueryState(EventsByTagStage.QueryState queryState) {
        updateStageState(stageState -> {
            return stageState.copy(queryState, stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
        });
    }

    private ExecutionContext ec() {
        return materializer().executionContext();
    }

    private AsyncCallback<Try<AsyncResultSet>> newResultSetCb() {
        return this.newResultSetCb;
    }

    private AsyncCallback<Try<Map<String, Tuple2<Object, UUID>>>> backTrackCb() {
        return this.backTrackCb;
    }

    public void preStart() {
        stageState_$eq(new EventsByTagStage.StageState(EventsByTagStage$QueryIdle$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset, calculateToOffset(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialTagPidSequenceNrs.transform((str, tuple2) -> {
            Tuple2 tuple2;
            Tuple2 tuple22 = new Tuple2(str, tuple2);
            if (tuple22 == null || (tuple2 = (Tuple2) tuple22._2()) == null) {
                throw new MatchError(tuple22);
            }
            return new Tuple3(BoxesRunTime.boxToLong(tuple2._1$mcJ$sp()), (UUID) tuple2._2(), BoxesRunTime.boxToLong(System.currentTimeMillis()));
        }), false, System.currentTimeMillis(), None$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$bucketSize));
        if (log().isInfoEnabled()) {
            log().info(new StringBuilder(85).append("[{}]: EventsByTag query [").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).append("] starting with EC delay {}ms: fromOffset [{}] toOffset [{}]").toString(), this.$outer.stageUuid(), BoxesRunTime.boxToLong(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency().toMillis()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.map(uuid -> {
                return org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(uuid);
            }));
        }
        log().debug("[{}] Starting with tag pid sequence nrs [{}]", this.$outer.stageUuid(), stageState().tagPidSequenceNrs());
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().pubsubNotification().isFinite()) {
            Try$.MODULE$.apply(() -> {
                this.getStageActor(tuple22 -> {
                    $anonfun$preStart$4(this, tuple22);
                    return BoxedUnit.UNIT;
                });
                ActorRef mediator = DistributedPubSub$.MODULE$.apply(this.system()).mediator();
                DistributedPubSubMediator.Subscribe apply = DistributedPubSubMediator$Subscribe$.MODULE$.apply(new StringBuilder(9).append("apc.tags.").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).toString(), this.stageActor().ref());
                mediator.$bang(apply, mediator.$bang$default$2(apply));
            });
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        query();
        Some some = this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$refreshInterval;
        if (some instanceof Some) {
            FiniteDuration finiteDuration = (FiniteDuration) some.value();
            scheduleQueryPoll(finiteDuration.$greater$eq(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).seconds()) ? finiteDuration.$div(2L).$plus(new package.DurationLong(scala.concurrent.duration.package$.MODULE$.DurationLong(ThreadLocalRandom.current().nextLong(finiteDuration.toMillis() / 2))).millis()) : finiteDuration, finiteDuration);
            log().debug("[{}] Scheduling query poll at: {} ms", this.$outer.stageUuid(), BoxesRunTime.boxToLong(finiteDuration.toMillis()));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            log().debug("[{}] CurrentQuery: No query polling", this.$outer.stageUuid());
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        optionallySchedule$1(EventsByTagStage$PersistenceIdsCleanup$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().cleanUpPersistenceIds());
        optionallySchedule$1(EventsByTagStage$ScanForDelayedEvents$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().backtrack().interval());
    }

    private void scheduleQueryPoll(FiniteDuration finiteDuration, FiniteDuration finiteDuration2) {
        scheduleWithFixedDelay(EventsByTagStage$PeriodicQueryPoll$.MODULE$, finiteDuration, finiteDuration2);
    }

    public void onTimer(Object obj) {
        if (obj instanceof EventsByTagStage.QueryPoll) {
            m101continue();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (EventsByTagStage$PersistenceIdsCleanup$.MODULE$.equals(obj)) {
            cleanup();
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            if (!EventsByTagStage$ScanForDelayedEvents$.MODULE$.equals(obj)) {
                throw new IllegalStateException(new StringBuilder(21).append("Unexpected timerKey: ").append(obj).toString());
            }
            scanForDelayedEvents();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public void onPull() {
        tryPushOne();
    }

    private void scanForDelayedEvents() {
        UUID uuid;
        if (stageState().delayedScanInProgress()) {
            return;
        }
        updateStageState(stageState -> {
            return stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), true, stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
        });
        long key = stageState().currentTimeBucket().previous(1).key();
        long unixTimestamp = Uuids.unixTimestamp(stageState().fromOffset());
        if (System.currentTimeMillis() - stageState().previousLongDelayedScan() > this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$backtracking().longIntervalMillis()) {
            UUID startOf = Uuids.startOf(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$backtracking().longPeriodMillis(unixTimestamp, key));
            log().debug("Initialising long period back track from {}", org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(startOf));
            updateStageState(stageState2 -> {
                return stageState2.copy(stageState2.copy$default$1(), stageState2.copy$default$2(), stageState2.copy$default$3(), stageState2.copy$default$4(), stageState2.copy$default$5(), System.currentTimeMillis(), stageState2.copy$default$7(), stageState2.copy$default$8());
            });
            uuid = startOf;
        } else {
            UUID startOf2 = Uuids.startOf(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$backtracking().periodMillis(unixTimestamp, key));
            if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug()) {
                log().debug("Initialising period back track from {}", org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(startOf2));
            }
            uuid = startOf2;
        }
        UUID uuid2 = uuid;
        this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$scanner.scan(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), UUIDComparator$.MODULE$.comparator().compare(uuid2, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset) < 0 ? this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset : uuid2, stageState().fromOffset(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$bucketSize, Duration$.MODULE$.Zero(), (j, j2) -> {
            return scala.math.package$.MODULE$.max(j, j2);
        }).onComplete(r4 -> {
            $anonfun$scanForDelayedEvents$4(this, r4);
            return BoxedUnit.UNIT;
        }, ec());
    }

    private long cleanupPersistenceIdsMills() {
        return this.cleanupPersistenceIdsMills;
    }

    private void cleanup() {
        long currentTimeMillis = System.currentTimeMillis();
        Map map = (Map) stageState().tagPidSequenceNrs().filterNot(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$cleanup$1(this, currentTimeMillis, tuple2));
        });
        updateStageState(stageState -> {
            return stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), stageState.copy$default$3(), map, stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
        });
    }

    /* renamed from: continue, reason: not valid java name */
    private void m101continue() {
        EventsByTagStage.QueryState state = stageState().state();
        if (EventsByTagStage$QueryIdle$.MODULE$.equals(state)) {
            if (stageState().isLookingForMissing()) {
                lookForMissing();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            } else {
                query();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (state instanceof EventsByTagStage.QueryResult ? true : state instanceof EventsByTagStage.BufferedEvents) {
            tryPushOne();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            if (!(state instanceof EventsByTagStage.QueryInProgress)) {
                throw new MatchError(state);
            }
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
    }

    private void query() {
        updateToOffset();
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug() && log().isDebugEnabled()) {
            log().debug("[{}] Executing query: timeBucket: {} from offset: {} to offset: {}", this.$outer.stageUuid(), stageState().currentTimeBucket(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().fromOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().toOffset()));
        }
        updateStageState(stageState -> {
            return stageState.copy(new EventsByTagStage.QueryInProgress(false, EventsByTagStage$QueryInProgress$.MODULE$.apply$default$2()), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
        });
        this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.selectEventsForBucket(stageState().currentTimeBucket(), stageState().fromOffset(), stageState().toOffset(), (obj, th, finiteDuration) -> {
            $anonfun$query$2(this, BoxesRunTime.unboxToInt(obj), th, finiteDuration);
            return BoxedUnit.UNIT;
        }, ec(), scheduler()).onComplete(r4 -> {
            $anonfun$query$3(this, r4);
            return BoxedUnit.UNIT;
        }, ec());
    }

    private EventsByTagStage.LookingForMissing getMissingLookup() {
        Some missingLookup = stageState().missingLookup();
        if (missingLookup instanceof Some) {
            return (EventsByTagStage.LookingForMissing) missingLookup.value();
        }
        if (None$.MODULE$.equals(missingLookup)) {
            throw new IllegalStateException(new StringBuilder(90).append("lookingForMissingCalled for tag ").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).append(" when there ").append("is no missing. Raise a bug with debug logging.").toString());
        }
        throw new MatchError(missingLookup);
    }

    private void lookForMissing() {
        EventsByTagStage.LookingForMissing missingLookup = getMissingLookup();
        if (missingLookup.deadline().isOverdue()) {
            abortMissingSearch(missingLookup);
            return;
        }
        updateQueryState(new EventsByTagStage.QueryInProgress(false, EventsByTagStage$QueryInProgress$.MODULE$.apply$default$2()));
        if (log().isDebugEnabled()) {
            log().debug(new StringBuilder(69).append("[").append(this.$outer.stageUuid()).append("] ").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).append(": Executing query to look for {}. Timebucket: {}. From: {}. To: {}").toString(), missingLookup.gapDetected() ? "missing" : "previous events", missingLookup.bucket(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(missingLookup.minOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(missingLookup.maxOffset()));
        }
        this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.selectEventsForBucket(missingLookup.bucket(), missingLookup.minOffset(), missingLookup.maxOffset(), (obj, th, finiteDuration) -> {
            $anonfun$lookForMissing$1(this, BoxesRunTime.unboxToInt(obj), th, finiteDuration);
            return BoxedUnit.UNIT;
        }, ec(), scheduler()).onComplete(r4 -> {
            $anonfun$lookForMissing$2(this, r4);
            return BoxedUnit.UNIT;
        }, ec());
    }

    private void abortMissingSearch(EventsByTagStage.LookingForMissing lookingForMissing) {
        if (lookingForMissing.gapDetected()) {
            fail(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out(), new MissingTaggedEventException(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), lookingForMissing.remainingMissing(), lookingForMissing.minOffset(), lookingForMissing.maxOffset()));
            return;
        }
        log().debug("[{}] [{}]: Finished scanning for older events for persistence ids [{}]", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), lookingForMissing.remainingMissing().keys().mkString(","));
        stopLookingForMissing(lookingForMissing, lookingForMissing.buffered());
        tryPushOne();
    }

    private void checkResultSetForMissing(AsyncResultSet asyncResultSet, EventsByTagStage.LookingForMissing lookingForMissing) {
        Row row = (Row) asyncResultSet.one();
        String string = row.getString("persistence_id");
        long j = row.getLong("tag_pid_sequence_nr");
        Some some = lookingForMissing.remainingMissing().get(string);
        if (some instanceof Some) {
            Set set = (Set) some.value();
            if (set.contains(BoxesRunTime.boxToLong(j))) {
                EventsByTagStage.UUIDRow extractUuidRow = extractUuidRow(row);
                Set $minus = set.$minus(BoxesRunTime.boxToLong(extractUuidRow.tagPidSequenceNr()));
                Map updated = $minus.isEmpty() ? (Map) lookingForMissing.remainingMissing().$minus(string) : lookingForMissing.remainingMissing().updated(string, $minus);
                if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug()) {
                    log().debug("[{}] {}: Found a missing event, sequence nr {}. Remaining missing: {}", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), BoxesRunTime.boxToLong(extractUuidRow.tagPidSequenceNr()), updated);
                }
                if (updated.isEmpty()) {
                    stopLookingForMissing(lookingForMissing, lookingForMissing.buffered().$colon$colon(extractUuidRow));
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                } else {
                    log().debug("[{}] [{}]: There are more missing events. [{}]", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), updated);
                    EventsByTagStage.StageState stageState = stageState();
                    stageState_$eq(stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState().missingLookup().map(lookingForMissing2 -> {
                        return lookingForMissing2.copy(lookingForMissing2.buffered().$colon$colon(extractUuidRow), lookingForMissing2.copy$default$2(), lookingForMissing2.copy$default$3(), lookingForMissing2.copy$default$4(), lookingForMissing2.copy$default$5(), lookingForMissing2.copy$default$6(), updated, lookingForMissing2.copy$default$8(), lookingForMissing2.copy$default$9());
                    }), stageState.copy$default$8()));
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    private void updateStageState(Function1<EventsByTagStage.StageState, EventsByTagStage.StageState> function1) {
        stageState_$eq((EventsByTagStage.StageState) function1.apply(stageState()));
    }

    private boolean handleFirstTimePersistenceId(EventsByTagStage.UUIDRow uUIDRow) {
        UUID uuid;
        if (uUIDRow.tagPidSequenceNr() == 1) {
            updateStageState(stageState -> {
                return stageState.copy(stageState.copy$default$1(), uUIDRow.offset(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8()).tagPidSequenceNumberUpdate(uUIDRow.persistenceId(), new Tuple3<>(BoxesRunTime.boxToLong(1L), uUIDRow.offset(), BoxesRunTime.boxToLong(System.currentTimeMillis())));
            });
            push(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out(), uUIDRow);
            return false;
        }
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$usingOffset) {
            if (!stageState().currentTimeBucket().inPast()) {
                FiniteDuration newPersistenceIdScanTimeout = this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().newPersistenceIdScanTimeout();
                FiniteDuration Zero = Duration$.MODULE$.Zero();
                if (newPersistenceIdScanTimeout != null) {
                }
            }
            log().debug("[{}] New persistence id: {}. Timebucket: {}. Tag pid sequence nr: {}", this.$outer.stageUuid(), uUIDRow.persistenceId(), stageState().currentTimeBucket(), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()));
            updateStageState(stageState2 -> {
                return stageState2.copy(stageState2.copy$default$1(), uUIDRow.offset(), stageState2.copy$default$3(), stageState2.copy$default$4(), stageState2.copy$default$5(), stageState2.copy$default$6(), stageState2.copy$default$7(), stageState2.copy$default$8()).tagPidSequenceNumberUpdate(uUIDRow.persistenceId(), new Tuple3<>(BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()), uUIDRow.offset(), BoxesRunTime.boxToLong(System.currentTimeMillis())));
            });
            push(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out(), uUIDRow);
            return false;
        }
        if (log().isDebugEnabled()) {
            log().debug(new StringBuilder(326).append("[").append(this.$outer.stageUuid()).append("] ").append(" [{}]: Persistence Id not in metadata: [{}] does not start at tag pid sequence nr 1. ").append("This could either be that the events are before the offset, that the metadata has been dropped or that they are delayed. ").append("Tag pid sequence nr found: [{}]. Looking for lower tag pid sequence nrs for [{}] in the current and previous buckets.").toString(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), uUIDRow.persistenceId(), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()), PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().newPersistenceIdScanTimeout())));
        }
        UUID startOf = Uuids.startOf(stageState().currentTimeBucket().previous(1).key());
        if (UUIDComparator$.MODULE$.comparator().compare(startOf, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset) < 0) {
            log().debug("[{}] Starting at fromOffset", this.$outer.stageUuid());
            uuid = this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$initialQueryOffset;
        } else {
            log().debug("[{}] Starting at startOfBucket", this.$outer.stageUuid());
            uuid = startOf;
        }
        setLookingForMissingState(Nil$.MODULE$.$colon$colon(uUIDRow), uuid, uUIDRow.offset(), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uUIDRow.persistenceId()), new EventsByTagStage.MissingData(uUIDRow.offset(), uUIDRow.tagPidSequenceNr()))})), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uUIDRow.persistenceId()), new RichLong(Predef$.MODULE$.longWrapper(1L)).until(BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr())).toSet())})), false, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().newPersistenceIdScanTimeout());
        return true;
    }

    private boolean handleExistingPersistenceId(EventsByTagStage.UUIDRow uUIDRow, long j, UUID uuid) {
        long j2 = j + 1;
        String persistenceId = uUIDRow.persistenceId();
        if (uUIDRow.tagPidSequenceNr() < j2) {
            log().warning(new StringBuilder(117).append("[").append(this.$outer.stageUuid()).append("] ").append("Duplicate sequence number. Persistence id: {}. Tag: {}. Expected sequence nr: {}. ").append("Actual {}. This will be dropped.").toString(), persistenceId, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), BoxesRunTime.boxToLong(j2), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()));
            return false;
        }
        if (uUIDRow.tagPidSequenceNr() > j2) {
            log().info(new StringBuilder(82).append("[").append(this.$outer.stageUuid()).append("] ").append("{}: Missing event for persistence id: {}. Expected sequence nr: {}, actual: {}.").toString(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), persistenceId, BoxesRunTime.boxToLong(j2), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()));
            setLookingForMissingState(Nil$.MODULE$.$colon$colon(uUIDRow), uuid, uUIDRow.offset(), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uUIDRow.persistenceId()), new EventsByTagStage.MissingData(uUIDRow.offset(), uUIDRow.tagPidSequenceNr()))})), (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(uUIDRow.persistenceId()), new RichLong(Predef$.MODULE$.longWrapper(j2)).until(BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr())).toSet())})), true, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventsByTagGapTimeout());
            return true;
        }
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug()) {
            log().debug(new StringBuilder(66).append("[").append(this.$outer.stageUuid()).append("] ").append(" Updating offset to {} from pId {} seqNr {} tagPidSequenceNr {}").toString(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(uUIDRow.offset()), persistenceId, BoxesRunTime.boxToLong(uUIDRow.sequenceNr()), BoxesRunTime.boxToLong(uUIDRow.tagPidSequenceNr()));
        }
        updateStageState(stageState -> {
            return stageState.copy(stageState.copy$default$1(), uUIDRow.offset(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8()).tagPidSequenceNumberUpdate(uUIDRow.persistenceId(), new Tuple3<>(BoxesRunTime.boxToLong(j2), uUIDRow.offset(), BoxesRunTime.boxToLong(System.currentTimeMillis())));
        });
        push(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out(), uUIDRow);
        return false;
    }

    private void setLookingForMissingState(List<EventsByTagStage.UUIDRow> list, UUID uuid, UUID uuid2, Map<String, EventsByTagStage.MissingData> map, Map<String, Set<Object>> map2, boolean z, FiniteDuration finiteDuration) {
        TimeBucket apply = TimeBucket$.MODULE$.apply(uuid2, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$bucketSize);
        boolean shouldSearchInPreviousBucket = shouldSearchInPreviousBucket(apply, uuid);
        updateStageState(stageState -> {
            return stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), new Some(new EventsByTagStage.LookingForMissing(list, uuid, uuid2, apply, shouldSearchInPreviousBucket, map, map2, Deadline$.MODULE$.now().$plus(finiteDuration), z)), stageState.copy$default$8());
        });
    }

    private long totalMissing() {
        return BoxesRunTime.unboxToLong(getMissingLookup().remainingMissing().values().foldLeft(BoxesRunTime.boxToLong(0L), (obj, set) -> {
            return BoxesRunTime.boxToLong($anonfun$totalMissing$1(BoxesRunTime.unboxToLong(obj), set));
        }));
    }

    private void failTooManyMissing(long j) {
        failStage(new RuntimeException(new StringBuilder(57).append(j).append(" missing tagged events for tag [").append(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag()).append("]. Failing without search").toString()));
    }

    /* JADX WARN: Code restructure failed: missing block: B:23:0x02b4, code lost:
    
        r0 = scala.runtime.BoxedUnit.UNIT;
     */
    /* JADX WARN: Code restructure failed: missing block: B:77:0x0127, code lost:
    
        throw new scala.MatchError(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void tryPushOne() {
        /*
            Method dump skipped, instructions count: 774
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pekko.persistence.cassandra.query.EventsByTagStage$$anon$2.tryPushOne():void");
    }

    private boolean shouldSearchInPreviousBucket(TimeBucket timeBucket, UUID uuid) {
        return !timeBucket.within(uuid);
    }

    private void queryExhausted() {
        updateQueryState(EventsByTagStage$QueryIdle$.MODULE$);
        if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug() && log().isDebugEnabled()) {
            log().debug("[{}] Query exhausted, next query: from: {} to: {}", this.$outer.stageUuid(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().fromOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(stageState().toOffset()));
        }
        if (!stageState().isLookingForMissing()) {
            if (stageState().shouldMoveBucket()) {
                nextTimeBucket();
                log().debug("[{}] Moving to next bucket: {}", this.$outer.stageUuid(), stageState().currentTimeBucket());
                query();
                return;
            } else {
                if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.contains(stageState().toOffset())) {
                    log().debug("[{}] Current query finished and has passed the eventual consistency delay, ending.", this.$outer.stageUuid());
                    completeStage();
                    return;
                }
                if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$verboseDebug()) {
                    log().debug("[{}] Nothing todo, waiting for next poll", this.$outer.stageUuid());
                }
                if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$isLiveQuery()) {
                    return;
                }
                log().debug("[{}] Scheduling poll for eventual consistency delay: {}", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency());
                scheduleOnce(EventsByTagStage$OneOffQueryPoll$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency());
                return;
            }
        }
        EventsByTagStage.LookingForMissing lookingForMissing = (EventsByTagStage.LookingForMissing) stageState().missingLookup().get();
        if (lookingForMissing.queryPrevious()) {
            log().debug("[{}] [{}]: Missing could be in previous bucket. Querying right away.", this.$outer.stageUuid(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag());
            updateStageState(stageState -> {
                return stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), this.stageState().missingLookup().map(lookingForMissing2 -> {
                    return lookingForMissing2.copy(lookingForMissing2.copy$default$1(), lookingForMissing2.copy$default$2(), lookingForMissing2.copy$default$3(), lookingForMissing2.bucket().previous(1), false, lookingForMissing2.copy$default$6(), lookingForMissing2.copy$default$7(), lookingForMissing2.copy$default$8(), lookingForMissing2.copy$default$9());
                }), stageState.copy$default$8());
            });
            lookForMissing();
            return;
        }
        FiniteDuration timeLeft = lookingForMissing.deadline().timeLeft();
        if (timeLeft.$less$eq(Duration$.MODULE$.Zero())) {
            abortMissingSearch(lookingForMissing);
            return;
        }
        log().debug(new StringBuilder(63).append("[").append(this.$outer.stageUuid()).append("] [{}]: Still looking for {}. {}. Duration left for search: {}").toString(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag(), lookingForMissing.gapDetected() ? "missing" : "previous events", stageState(), PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(timeLeft)));
        updateStageState(stageState2 -> {
            return stageState2.copy(stageState2.copy$default$1(), stageState2.copy$default$2(), stageState2.copy$default$3(), stageState2.copy$default$4(), stageState2.copy$default$5(), stageState2.copy$default$6(), this.stageState().missingLookup().map(lookingForMissing2 -> {
                TimeBucket apply = TimeBucket$.MODULE$.apply(lookingForMissing2.maxOffset(), this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$bucketSize);
                return lookingForMissing2.copy(lookingForMissing2.copy$default$1(), lookingForMissing2.copy$default$2(), lookingForMissing2.copy$default$3(), apply, this.shouldSearchInPreviousBucket(apply, lookingForMissing2.minOffset()), lookingForMissing2.copy$default$6(), lookingForMissing2.copy$default$7(), lookingForMissing2.copy$default$8(), lookingForMissing2.copy$default$9());
            }), stageState2.copy$default$8());
        });
        if (timeLeft.$less(this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.querySettings().refreshInterval())) {
            log().debug("Scheduling one off poll in {}", PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(timeLeft)));
            scheduleOnce(EventsByTagStage$OneOffQueryPoll$.MODULE$, timeLeft);
        } else {
            if (this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$isLiveQuery()) {
                return;
            }
            log().debug("Scheduling one off poll in {}", this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.querySettings().refreshInterval());
            scheduleOnce(EventsByTagStage$OneOffQueryPoll$.MODULE$, this.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.querySettings().refreshInterval());
        }
    }

    private void stopLookingForMissing(EventsByTagStage.LookingForMissing lookingForMissing, List<EventsByTagStage.UUIDRow> list) {
        updateQueryState(list.isEmpty() ? EventsByTagStage$QueryIdle$.MODULE$ : new EventsByTagStage.BufferedEvents((List) list.sorted(EventsByTagStage$.MODULE$.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$uuidRowOrdering())));
        log().debug("[{}] Search over. Buffered events ready for delivery: {}", this.$outer.stageUuid(), stageState().state());
        updateStageState(stageState -> {
            return (EventsByTagStage.StageState) lookingForMissing.missingData().foldLeft(stageState.copy(stageState.copy$default$1(), lookingForMissing.maxOffset(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), None$.MODULE$, stageState.copy$default$8()), (stageState, tuple2) -> {
                Tuple2 tuple2 = new Tuple2(stageState, tuple2);
                if (tuple2 != null) {
                    EventsByTagStage.StageState stageState = (EventsByTagStage.StageState) tuple2._1();
                    Tuple2 tuple22 = (Tuple2) tuple2._2();
                    if (tuple22 != null) {
                        String str = (String) tuple22._1();
                        EventsByTagStage.MissingData missingData = (EventsByTagStage.MissingData) tuple22._2();
                        this.log().debug("Updating tag pid sequence nr for pid {} to {}", str, BoxesRunTime.boxToLong(missingData.maxSequenceNr()));
                        return stageState.tagPidSequenceNumberUpdate(str, new Tuple3<>(BoxesRunTime.boxToLong(missingData.maxSequenceNr()), missingData.maxOffset(), BoxesRunTime.boxToLong(System.currentTimeMillis())));
                    }
                }
                throw new MatchError(tuple2);
            });
        });
    }

    private void fetchMore(AsyncResultSet asyncResultSet) {
        log().debug("[{}] No more results without paging. Requesting more.", this.$outer.stageUuid());
        Future asScala$extension = FutureConverters$CompletionStageOps$.MODULE$.asScala$extension(FutureConverters$.MODULE$.CompletionStageOps(asyncResultSet.fetchNextPage()));
        updateQueryState(new EventsByTagStage.QueryInProgress(false, EventsByTagStage$QueryInProgress$.MODULE$.apply$default$2()));
        asScala$extension.onComplete(r4 -> {
            $anonfun$fetchMore$1(this, r4);
            return BoxedUnit.UNIT;
        }, ec());
    }

    private EventsByTagStage.UUIDRow extractUuidRow(Row row) {
        return new EventsByTagStage.UUIDRow(row.getString("persistence_id"), row.getLong("sequence_nr"), row.getUuid("timestamp"), row.getLong("tag_pid_sequence_nr"), row);
    }

    private void nextTimeBucket() {
        updateStageState(stageState -> {
            return stageState.copy(stageState.copy$default$1(), Uuids.startOf(this.stageState().currentTimeBucket().next().key()), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
        });
        updateToOffset();
    }

    public static final /* synthetic */ void $anonfun$newResultSetCb$1(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, Try r7) {
        if (!(r7 instanceof Success)) {
            if (!(r7 instanceof Failure)) {
                throw new MatchError(r7);
            }
            Throwable exception = ((Failure) r7).exception();
            eventsByTagStage$$anon$2.log().warning("Cassandra query failed: {}", exception.getMessage());
            eventsByTagStage$$anon$2.fail(eventsByTagStage$$anon$2.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out(), exception);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        AsyncResultSet asyncResultSet = (AsyncResultSet) ((Success) r7).value();
        EventsByTagStage.QueryState state = eventsByTagStage$$anon$2.stageState().state();
        if (!(state instanceof EventsByTagStage.QueryInProgress)) {
            throw new IllegalStateException(new StringBuilder(39).append("New ResultSet when in unexpected state ").append(eventsByTagStage$$anon$2.stageState().state()).toString());
        }
        if (((EventsByTagStage.QueryInProgress) state).abortForMissingSearch()) {
            eventsByTagStage$$anon$2.lookForMissing();
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            eventsByTagStage$$anon$2.updateStageState(stageState -> {
                return stageState.copy(new EventsByTagStage.QueryResult(asyncResultSet), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), stageState.copy$default$5(), stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
            });
            eventsByTagStage$$anon$2.tryPushOne();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ boolean $anonfun$backTrackCb$4(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, Tuple2 tuple2) {
        Tuple3 tuple3;
        if (tuple2 != null) {
            String str = (String) tuple2._1();
            Tuple2 tuple22 = (Tuple2) tuple2._2();
            if (tuple22 != null) {
                long _1$mcJ$sp = tuple22._1$mcJ$sp();
                Some some = eventsByTagStage$$anon$2.stageState().tagPidSequenceNrs().get(str);
                return ((some instanceof Some) && (tuple3 = (Tuple3) some.value()) != null && BoxesRunTime.unboxToLong(tuple3._1()) < _1$mcJ$sp) || None$.MODULE$.equals(some);
            }
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ long $anonfun$backTrackCb$7(Tuple3 tuple3) {
        return BoxesRunTime.unboxToLong(tuple3._1());
    }

    public static final /* synthetic */ long $anonfun$backTrackCb$10(Tuple3 tuple3) {
        return Uuids.unixTimestamp((UUID) tuple3._2());
    }

    public static final /* synthetic */ long $anonfun$backTrackCb$9(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, long j, String str) {
        Tuple2 tuple2 = new Tuple2(BoxesRunTime.boxToLong(j), str);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        return scala.math.package$.MODULE$.min(BoxesRunTime.unboxToLong(eventsByTagStage$$anon$2.stageState().tagPidSequenceNrs().get((String) tuple2._2()).map(tuple3 -> {
            return BoxesRunTime.boxToLong($anonfun$backTrackCb$10(tuple3));
        }).getOrElse(() -> {
            return eventsByTagStage$$anon$2.stageState().currentTimeBucket().previous(1).key();
        })), tuple2._1$mcJ$sp());
    }

    public static final /* synthetic */ void $anonfun$backTrackCb$1(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, Try r10) {
        if (r10 instanceof Failure) {
            Throwable exception = ((Failure) r10).exception();
            eventsByTagStage$$anon$2.updateStageState(stageState -> {
                return stageState.copy(stageState.copy$default$1(), stageState.copy$default$2(), stageState.copy$default$3(), stageState.copy$default$4(), false, stageState.copy$default$6(), stageState.copy$default$7(), stageState.copy$default$8());
            });
            eventsByTagStage$$anon$2.log().warning("Backtrack failed, this will retried. {}", exception);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(r10 instanceof Success)) {
            throw new MatchError(r10);
        }
        Map map = (Map) ((Success) r10).value();
        eventsByTagStage$$anon$2.updateStageState(stageState2 -> {
            return stageState2.copy(stageState2.copy$default$1(), stageState2.copy$default$2(), stageState2.copy$default$3(), stageState2.copy$default$4(), false, stageState2.copy$default$6(), stageState2.copy$default$7(), stageState2.copy$default$8());
        });
        eventsByTagStage$$anon$2.log().debug("Current sequence nrs: {} from back tracking: {}", eventsByTagStage$$anon$2.stageState().tagPidSequenceNrs(), map);
        Map map2 = (Map) map.filter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$backTrackCb$4(eventsByTagStage$$anon$2, tuple2));
        });
        if (!map2.nonEmpty() || eventsByTagStage$$anon$2.stageState().isLookingForMissing()) {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        eventsByTagStage$$anon$2.log().debug("The following persistence ids have delayed events: {}. Initiating search for the events.", map2);
        EventsByTagStage.QueryState state = eventsByTagStage$$anon$2.stageState().state();
        Nil$ events = state instanceof EventsByTagStage.BufferedEvents ? ((EventsByTagStage.BufferedEvents) state).events() : Nil$.MODULE$;
        Map<String, EventsByTagStage.MissingData> map3 = (Map) map2.transform((str, tuple22) -> {
            Tuple2 tuple22;
            Tuple2 tuple23 = new Tuple2(str, tuple22);
            if (tuple23 == null || (tuple22 = (Tuple2) tuple23._2()) == null) {
                throw new MatchError(tuple23);
            }
            return new EventsByTagStage.MissingData((UUID) tuple22._2(), tuple22._1$mcJ$sp());
        });
        Map<String, Set<Object>> map4 = (Map) map2.transform((str2, tuple23) -> {
            Tuple2 tuple23 = new Tuple2(str2, tuple23);
            if (tuple23 != null) {
                String str2 = (String) tuple23._1();
                Tuple2 tuple24 = (Tuple2) tuple23._2();
                if (tuple24 != null) {
                    long _1$mcJ$sp = tuple24._1$mcJ$sp();
                    return new RichLong(Predef$.MODULE$.longWrapper(BoxesRunTime.unboxToLong(eventsByTagStage$$anon$2.stageState().tagPidSequenceNrs().get(str2).map(tuple3 -> {
                        return BoxesRunTime.boxToLong($anonfun$backTrackCb$7(tuple3));
                    }).getOrElse(() -> {
                        return 0L;
                    })) + 1)).to(BoxesRunTime.boxToLong(_1$mcJ$sp)).toSet();
                }
            }
            throw new MatchError(tuple23);
        });
        UUID startOf = Uuids.startOf(BoxesRunTime.unboxToLong(map3.keys().foldLeft(BoxesRunTime.boxToLong(Uuids.unixTimestamp(eventsByTagStage$$anon$2.stageState().toOffset())), (obj, str3) -> {
            return BoxesRunTime.boxToLong($anonfun$backTrackCb$9(eventsByTagStage$$anon$2, BoxesRunTime.unboxToLong(obj), str3));
        })));
        eventsByTagStage$$anon$2.log().debug("Starting search for: {}", map4);
        eventsByTagStage$$anon$2.setLookingForMissingState(events, startOf, eventsByTagStage$$anon$2.stageState().toOffset(), map3, map4, true, eventsByTagStage$$anon$2.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventsByTagGapTimeout());
        long j = eventsByTagStage$$anon$2.totalMissing();
        if (j > eventsByTagStage$$anon$2.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().maxMissingToSearch()) {
            eventsByTagStage$$anon$2.failTooManyMissing(j);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        EventsByTagStage.QueryState state2 = eventsByTagStage$$anon$2.stageState().state();
        if (state2 instanceof EventsByTagStage.QueryInProgress) {
            EventsByTagStage.QueryInProgress queryInProgress = (EventsByTagStage.QueryInProgress) state2;
            eventsByTagStage$$anon$2.updateStageState(stageState3 -> {
                return stageState3.copy(queryInProgress.copy(true, queryInProgress.copy$default$2()), stageState3.copy$default$2(), stageState3.copy$default$3(), stageState3.copy$default$4(), stageState3.copy$default$5(), stageState3.copy$default$6(), stageState3.copy$default$7(), stageState3.copy$default$8());
            });
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        } else {
            eventsByTagStage$$anon$2.lookForMissing();
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        }
        BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$preStart$4(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        if (!tuple2._2().equals(eventsByTagStage$$anon$2.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$session.tag())) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        eventsByTagStage$$anon$2.log().debug("[{}] Received pub sub tag update for our tag, initiating query", eventsByTagStage$$anon$2.$outer.stageUuid());
        FiniteDuration eventualConsistency = eventsByTagStage$$anon$2.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency();
        FiniteDuration Zero = Duration$.MODULE$.Zero();
        if (eventualConsistency != null ? eventualConsistency.equals(Zero) : Zero == null) {
            eventsByTagStage$$anon$2.m101continue();
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else if (!eventsByTagStage$$anon$2.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency().$less(eventsByTagStage$$anon$2.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.querySettings().refreshInterval())) {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            eventsByTagStage$$anon$2.scheduleOnce(new EventsByTagStage.TagNotification(System.currentTimeMillis() / 10), eventsByTagStage$$anon$2.$outer.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().eventualConsistency());
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
    }

    private final void optionallySchedule$1(Object obj, Option option) {
        option.foreach(finiteDuration -> {
            this.scheduleWithFixedDelay(obj, finiteDuration, finiteDuration);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$scanForDelayedEvents$4(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, Try r4) {
        eventsByTagStage$$anon$2.backTrackCb().invoke(r4);
    }

    public static final /* synthetic */ boolean $anonfun$cleanup$1(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, long j, Tuple2 tuple2) {
        Tuple3 tuple3;
        if (tuple2 == null || (tuple3 = (Tuple3) tuple2._2()) == null) {
            throw new MatchError(tuple2);
        }
        return j - BoxesRunTime.unboxToLong(tuple3._3()) > eventsByTagStage$$anon$2.cleanupPersistenceIdsMills();
    }

    public static final /* synthetic */ void $anonfun$query$2(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, int i, Throwable th, FiniteDuration finiteDuration) {
        if (eventsByTagStage$$anon$2.log().isWarningEnabled()) {
            eventsByTagStage$$anon$2.log().warning(new StringBuilder(100).append("[{}] Query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ").append(i).append(". Next retry in: ").append(PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(finiteDuration))).append(". Reason: ").append(th.getMessage()).toString(), eventsByTagStage$$anon$2.$outer.stageUuid(), eventsByTagStage$$anon$2.stageState().currentTimeBucket(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(eventsByTagStage$$anon$2.stageState().fromOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(eventsByTagStage$$anon$2.stageState().toOffset()));
        }
    }

    public static final /* synthetic */ void $anonfun$query$3(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, Try r4) {
        eventsByTagStage$$anon$2.newResultSetCb().invoke(r4);
    }

    public static final /* synthetic */ void $anonfun$lookForMissing$1(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, int i, Throwable th, FiniteDuration finiteDuration) {
        if (eventsByTagStage$$anon$2.log().isWarningEnabled()) {
            eventsByTagStage$$anon$2.log().warning(new StringBuilder(120).append("[{}] Looking for missing query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ").append(i).append(". Next retry in: ").append(PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(finiteDuration))).append(". Reason: ").append(th.getMessage()).toString(), eventsByTagStage$$anon$2.$outer.stageUuid(), eventsByTagStage$$anon$2.stageState().currentTimeBucket(), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(eventsByTagStage$$anon$2.stageState().fromOffset()), org.apache.pekko.persistence.cassandra.package$.MODULE$.formatOffset(eventsByTagStage$$anon$2.stageState().toOffset()));
        }
    }

    public static final /* synthetic */ void $anonfun$lookForMissing$2(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, Try r4) {
        eventsByTagStage$$anon$2.newResultSetCb().invoke(r4);
    }

    public static final /* synthetic */ long $anonfun$totalMissing$1(long j, Set set) {
        return j + set.size();
    }

    public static final /* synthetic */ void $anonfun$fetchMore$1(EventsByTagStage$$anon$2 eventsByTagStage$$anon$2, Try r4) {
        eventsByTagStage$$anon$2.newResultSetCb().invoke(r4);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public EventsByTagStage$$anon$2(EventsByTagStage eventsByTagStage) {
        super(eventsByTagStage.m97shape());
        if (eventsByTagStage == null) {
            throw null;
        }
        this.$outer = eventsByTagStage;
        StageLogging.$init$(this);
        OutHandler.$init$(this);
        this.toOffsetMillis = BoxesRunTime.unboxToLong(eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$toOffset.map(uuid -> {
            return BoxesRunTime.boxToLong(Uuids.unixTimestamp(uuid));
        }).getOrElse(() -> {
            return Long.MAX_VALUE;
        }));
        setHandler(eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$out(), this);
        this.newResultSetCb = getAsyncCallback(r4 -> {
            $anonfun$newResultSetCb$1(this, r4);
            return BoxedUnit.UNIT;
        });
        this.backTrackCb = getAsyncCallback(r42 -> {
            $anonfun$backTrackCb$1(this, r42);
            return BoxedUnit.UNIT;
        });
        this.cleanupPersistenceIdsMills = BoxesRunTime.unboxToLong(eventsByTagStage.org$apache$pekko$persistence$cassandra$query$EventsByTagStage$$settings.eventsByTagSettings().cleanUpPersistenceIds().map(finiteDuration -> {
            return BoxesRunTime.boxToLong(finiteDuration.toMillis());
        }).getOrElse(() -> {
            return Long.MAX_VALUE;
        }));
    }
}
