package org.apache.pekko.persistence.query.journal.leveldb;

import com.typesafe.config.ConfigFactory;
import java.util.LinkedList;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ExtensionId;
import org.apache.pekko.actor.ScalaActorRef;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.persistence.JournalProtocol;
import org.apache.pekko.persistence.Persistence;
import org.apache.pekko.persistence.Persistence$;
import org.apache.pekko.persistence.PersistentRepr;
import org.apache.pekko.persistence.journal.leveldb.LeveldbJournal;
import org.apache.pekko.persistence.query.EventEnvelope;
import org.apache.pekko.persistence.query.EventEnvelope$;
import org.apache.pekko.persistence.query.Sequence;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogicWithLogging;
import scala.MatchError;
import scala.None$;
import scala.Tuple2;
import scala.collection.immutable.Set;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;

/* compiled from: EventsByTagStage.scala */
/* loaded from: input_file:org/apache/pekko/persistence/query/journal/leveldb/EventsByTagStage$$anon$1.class */
public final class EventsByTagStage$$anon$1 extends TimerGraphStageLogicWithLogging implements OutHandler, Buffer<EventEnvelope> {
    private final ActorRef journal;
    private long currOffset;
    private long toOffset;
    private ActorRef stageActorRef;
    private boolean replayInProgress;
    private boolean outstandingReplay;
    private LinkedList<EventEnvelope> org$apache$pekko$persistence$query$journal$leveldb$Buffer$$buf;
    private final /* synthetic */ EventsByTagStage $outer;

    @Override // org.apache.pekko.persistence.query.journal.leveldb.Buffer
    public void buffer(EventEnvelope eventEnvelope) {
        buffer((EventsByTagStage$$anon$1) ((Buffer) eventEnvelope));
    }

    @Override // org.apache.pekko.persistence.query.journal.leveldb.Buffer
    public void buffer(Set<EventEnvelope> set) {
        buffer((Set) set);
    }

    @Override // org.apache.pekko.persistence.query.journal.leveldb.Buffer
    public void deliverBuf(Outlet<EventEnvelope> outlet) {
        deliverBuf(outlet);
    }

    @Override // org.apache.pekko.persistence.query.journal.leveldb.Buffer
    public int bufferSize() {
        int bufferSize;
        bufferSize = bufferSize();
        return bufferSize;
    }

    @Override // org.apache.pekko.persistence.query.journal.leveldb.Buffer
    public boolean bufferEmpty() {
        boolean bufferEmpty;
        bufferEmpty = bufferEmpty();
        return bufferEmpty;
    }

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public void onDownstreamFinish(Throwable th) throws Exception {
        OutHandler.onDownstreamFinish$(this, th);
    }

    @Override // org.apache.pekko.persistence.query.journal.leveldb.Buffer
    public LinkedList<EventEnvelope> org$apache$pekko$persistence$query$journal$leveldb$Buffer$$buf() {
        return this.org$apache$pekko$persistence$query$journal$leveldb$Buffer$$buf;
    }

    @Override // org.apache.pekko.persistence.query.journal.leveldb.Buffer
    public final void org$apache$pekko$persistence$query$journal$leveldb$Buffer$_setter_$org$apache$pekko$persistence$query$journal$leveldb$Buffer$$buf_$eq(LinkedList<EventEnvelope> linkedList) {
        this.org$apache$pekko$persistence$query$journal$leveldb$Buffer$$buf = linkedList;
    }

    @Override // org.apache.pekko.persistence.query.journal.leveldb.Buffer
    public void doPush(Outlet<EventEnvelope> outlet, EventEnvelope eventEnvelope) {
        super/*org.apache.pekko.stream.stage.GraphStageLogic*/.push(outlet, eventEnvelope);
    }

    private ActorRef journal() {
        return this.journal;
    }

    private long currOffset() {
        return this.currOffset;
    }

    private void currOffset_$eq(long j) {
        this.currOffset = j;
    }

    private long toOffset() {
        return this.toOffset;
    }

    private void toOffset_$eq(long j) {
        this.toOffset = j;
    }

    private ActorRef stageActorRef() {
        return this.stageActorRef;
    }

    private void stageActorRef_$eq(ActorRef actorRef) {
        this.stageActorRef = actorRef;
    }

    private boolean replayInProgress() {
        return this.replayInProgress;
    }

    private void replayInProgress_$eq(boolean z) {
        this.replayInProgress = z;
    }

    private boolean outstandingReplay() {
        return this.outstandingReplay;
    }

    private void outstandingReplay_$eq(boolean z) {
        this.outstandingReplay = z;
    }

    public Class<?> logSource() {
        return EventsByTagStage.class;
    }

    public void preStart() {
        stageActorRef_$eq(getEagerStageActor(interpreter().materializer(), tuple2 -> {
            this.journalInteraction(tuple2);
            return BoxedUnit.UNIT;
        }).ref());
        this.$outer.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$refreshInterval.foreach(finiteDuration -> {
            $anonfun$preStart$2(this, finiteDuration);
            return BoxedUnit.UNIT;
        });
        requestMore();
    }

    public void onTimer(Object obj) {
        requestMore();
        deliverBuf(this.$outer.out());
    }

    private void requestMore() {
        int bufferSize;
        if (replayInProgress()) {
            outstandingReplay_$eq(true);
            return;
        }
        int i = this.$outer.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$maxBufSize;
        bufferSize = bufferSize();
        int i2 = i - bufferSize;
        if (i2 > 0) {
            replayInProgress_$eq(true);
            outstandingReplay_$eq(false);
            LeveldbJournal.ReplayTaggedMessages replayTaggedMessages = new LeveldbJournal.ReplayTaggedMessages(currOffset(), toOffset(), i2, this.$outer.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$tag, stageActorRef());
            journal().$bang(replayTaggedMessages, journal().$bang$default$2(replayTaggedMessages));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void journalInteraction(Tuple2<ActorRef, Object> tuple2) {
        int bufferSize;
        int bufferSize2;
        if (tuple2 == null) {
            throw new MatchError((Object) null);
        }
        Object _2 = tuple2._2();
        if (_2 instanceof LeveldbJournal.ReplayedTaggedMessage) {
            LeveldbJournal.ReplayedTaggedMessage replayedTaggedMessage = (LeveldbJournal.ReplayedTaggedMessage) _2;
            PersistentRepr persistent = replayedTaggedMessage.persistent();
            long offset = replayedTaggedMessage.offset();
            EventEnvelope$ eventEnvelope$ = EventEnvelope$.MODULE$;
            buffer((EventsByTagStage$$anon$1) ((Buffer) new EventEnvelope(new Sequence(offset), persistent.persistenceId(), persistent.sequenceNr(), persistent.payload(), persistent.timestamp(), None$.MODULE$)));
            currOffset_$eq(offset);
            deliverBuf(this.$outer.out());
            return;
        }
        if (!(_2 instanceof JournalProtocol.RecoverySuccess)) {
            if (_2 instanceof JournalProtocol.ReplayMessagesFailure) {
                failStage(((JournalProtocol.ReplayMessagesFailure) _2).cause());
                return;
            } else {
                if (!(_2 instanceof LeveldbJournal.TaggedEventAppended)) {
                    throw new RuntimeException();
                }
                requestMore();
                return;
            }
        }
        long highestSequenceNr = ((JournalProtocol.RecoverySuccess) _2).highestSequenceNr();
        replayInProgress_$eq(false);
        deliverBuf(this.$outer.out());
        LoggingAdapter log$ = StageLogging.log$(this);
        Long boxToLong = BoxesRunTime.boxToLong(currOffset());
        Long boxToLong2 = BoxesRunTime.boxToLong(toOffset());
        bufferSize = bufferSize();
        log$.debug("Replay complete. Current offset {} toOffset {} buffer size {} highestSeqNr {}", boxToLong, boxToLong2, BoxesRunTime.boxToInteger(bufferSize), BoxesRunTime.boxToLong(highestSequenceNr));
        if (highestSequenceNr < toOffset() && isCurrentQuery()) {
            toOffset_$eq(highestSequenceNr);
        }
        if (currOffset() >= toOffset()) {
            checkComplete();
            return;
        }
        bufferSize2 = bufferSize();
        if (bufferSize2 < this.$outer.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$maxBufSize) {
            if (isCurrentQuery() || outstandingReplay()) {
                requestMore();
            }
        }
    }

    private boolean isCurrentQuery() {
        return this.$outer.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$refreshInterval.isEmpty();
    }

    private void checkComplete() {
        boolean bufferEmpty;
        bufferEmpty = bufferEmpty();
        if (!bufferEmpty || currOffset() < toOffset()) {
            return;
        }
        completeStage();
    }

    public void onPull() {
        requestMore();
        deliverBuf(this.$outer.out());
        checkComplete();
    }

    public static final /* synthetic */ void $anonfun$preStart$2(EventsByTagStage$$anon$1 eventsByTagStage$$anon$1, FiniteDuration finiteDuration) {
        eventsByTagStage$$anon$1.scheduleWithFixedDelay(EventsByTagStage$Continue$.MODULE$, finiteDuration, finiteDuration);
        ScalaActorRef journal = eventsByTagStage$$anon$1.journal();
        LeveldbJournal.SubscribeTag subscribeTag = new LeveldbJournal.SubscribeTag(eventsByTagStage$$anon$1.$outer.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$tag);
        ActorRef stageActorRef = eventsByTagStage$$anon$1.stageActorRef();
        if (journal == null) {
            throw null;
        }
        journal.$bang(subscribeTag, stageActorRef);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public EventsByTagStage$$anon$1(EventsByTagStage eventsByTagStage) {
        super(eventsByTagStage.m68shape());
        if (eventsByTagStage == null) {
            throw null;
        }
        this.$outer = eventsByTagStage;
        org$apache$pekko$persistence$query$journal$leveldb$Buffer$_setter_$org$apache$pekko$persistence$query$journal$leveldb$Buffer$$buf_$eq(new LinkedList<>());
        Persistence apply$ = ExtensionId.apply$(Persistence$.MODULE$, eventsByTagStage.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$mat.system());
        String str = eventsByTagStage.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$writeJournalPluginId;
        if (apply$ == null) {
            throw null;
        }
        this.journal = apply$.journalFor(str, ConfigFactory.empty());
        this.currOffset = eventsByTagStage.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$fromOffset;
        this.toOffset = eventsByTagStage.org$apache$pekko$persistence$query$journal$leveldb$EventsByTagStage$$initialTooOffset;
        this.stageActorRef = null;
        this.replayInProgress = false;
        this.outstandingReplay = false;
        setHandler(eventsByTagStage.out(), this);
        Statics.releaseFence();
    }
}
