package akka.persistence.query.journal.redis;

import akka.persistence.PersistentRepr;
import akka.persistence.PersistentRepr$;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.Sequence;
import akka.persistence.redis.RedisKeys$;
import akka.persistence.redis.package$;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogicWithLogging;
import akka.stream.stage.OutHandler;
import akka.util.ByteString;
import redis.ByteStringDeserializer$;
import redis.RedisPubSub;
import redis.RedisPubSub$;
import redis.api.Limit;
import redis.api.Limit$;
import redis.api.pubsub.Message;
import redis.api.pubsub.PMessage;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.ExecutionContextExecutor;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: EventsByPersistenceIdSource.scala */
/* loaded from: input_file:BOOT-INF/lib/akka-persistence-redis_2.12-0.4.0.jar:akka/persistence/query/journal/redis/EventsByPersistenceIdSource$$anon$1.class */
public final class EventsByPersistenceIdSource$$anon$1 extends GraphStageLogicWithLogging {
    private int akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state;
    private Queue<EventEnvelope> buffer;
    private RedisPubSub subscription;
    private final int max;
    private long currentSequenceNr;
    private long to;
    private AsyncCallback<Seq<PersistentRepr>> callback;
    private final ClassTag<Seq<String>> StringSeq;
    private final /* synthetic */ EventsByPersistenceIdSource $outer;

    public int akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state() {
        return this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state;
    }

    public void akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(int i) {
        this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state = i;
    }

    private Queue<EventEnvelope> buffer() {
        return this.buffer;
    }

    private void buffer_$eq(Queue<EventEnvelope> queue) {
        this.buffer = queue;
    }

    private RedisPubSub subscription() {
        return this.subscription;
    }

    private void subscription_$eq(RedisPubSub redisPubSub) {
        this.subscription = redisPubSub;
    }

    private int max() {
        return this.max;
    }

    private long currentSequenceNr() {
        return this.currentSequenceNr;
    }

    private void currentSequenceNr_$eq(long j) {
        this.currentSequenceNr = j;
    }

    private long to() {
        return this.to;
    }

    private void to_$eq(long j) {
        this.to = j;
    }

    private AsyncCallback<Seq<PersistentRepr>> callback() {
        return this.callback;
    }

    private void callback_$eq(AsyncCallback<Seq<PersistentRepr>> asyncCallback) {
        this.callback = asyncCallback;
    }

    private ExecutionContextExecutor ec() {
        return materializer().executionContext();
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void preStart() {
        callback_$eq(getAsyncCallback(seq -> {
            $anonfun$preStart$1(this, seq);
            return BoxedUnit.UNIT;
        }));
        if (!this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$live) {
            akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(this.$outer.Initializing());
            AsyncCallback asyncCallback = getAsyncCallback(j -> {
                if (this.to() > j) {
                    this.to_$eq(j);
                }
                int akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state = this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state();
                if (this.$outer.QueryWhenInitializing() == akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state) {
                    this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(this.$outer.Idle());
                    this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$query();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    if (this.$outer.Initializing() == akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state) {
                        this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(this.$outer.Idle());
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                        return;
                    }
                    this.log().error(new StringOps("Unexpected source state when initializing: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state())})));
                    this.failStage(new IllegalStateException(new StringOps("Unexpected source state when initializing: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state())}))));
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                }
            });
            this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$redis.get(RedisKeys$.MODULE$.highestSequenceNrKey(this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$persistenceId), this.$outer.longFormatter()).onComplete(r6 -> {
                $anonfun$preStart$6(this, asyncCallback, r6);
                return BoxedUnit.UNIT;
            }, ec());
            return;
        }
        AsyncCallback asyncCallback2 = getAsyncCallback(message -> {
            $anonfun$preStart$3(this, message);
            return BoxedUnit.UNIT;
        });
        String host = this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$redis.host();
        int port = this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$redis.port();
        Seq<String> seq2 = (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{this.$outer.Channel()}));
        Nil$ nil$ = Nil$.MODULE$;
        Option<String> password = this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$redis.password();
        Function1<Message, BoxedUnit> function1 = message2 -> {
            asyncCallback2.invoke(message2);
            return BoxedUnit.UNIT;
        };
        Function1<PMessage, BoxedUnit> apply$default$6 = RedisPubSub$.MODULE$.apply$default$6();
        String apply$default$8 = RedisPubSub$.MODULE$.apply$default$8();
        subscription_$eq(new RedisPubSub(host, port, seq2, nil$, function1, apply$default$6, password, apply$default$8, this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$system, RedisPubSub$.MODULE$.apply$default$10(host, port, seq2, nil$, function1, apply$default$6, password, apply$default$8)));
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void postStop() {
        if (subscription() != null) {
            subscription().stop();
        }
    }

    private ClassTag<Seq<String>> StringSeq() {
        return this.StringSeq;
    }

    public void akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$query() {
        BoxedUnit boxedUnit;
        if (this.$outer.Idle() == akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state()) {
            if (buffer().isEmpty()) {
                akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(this.$outer.Querying());
                this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$redis.zrangebyscore(RedisKeys$.MODULE$.journalKey(this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$persistenceId), new Limit(currentSequenceNr(), Limit$.MODULE$.apply$default$2()), new Limit(to(), Limit$.MODULE$.apply$default$2()), new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(0L)), BoxesRunTime.boxToLong(max()))), ByteStringDeserializer$.MODULE$.ByteArray()).onComplete(r4 -> {
                    $anonfun$query$1(this, r4);
                    return BoxedUnit.UNIT;
                }, ec());
                boxedUnit = BoxedUnit.UNIT;
            } else {
                deliver();
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
        log().error(new StringOps("Unexpected source state when querying: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state())})));
        failStage(new IllegalStateException(new StringOps("Unexpected source state when querying: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state())}))));
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    private void deliver() {
        akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(this.$outer.Idle());
        push(this.$outer.out(), buffer().dequeue());
        if (!buffer().isEmpty() || currentSequenceNr() <= to()) {
            return;
        }
        completeStage();
    }

    public /* synthetic */ EventsByPersistenceIdSource akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ void $anonfun$preStart$1(EventsByPersistenceIdSource$$anon$1 eventsByPersistenceIdSource$$anon$1, Seq seq) {
        if (!seq.isEmpty()) {
            Tuple2 tuple2 = (Tuple2) seq.foldLeft(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(Seq$.MODULE$.empty()), BoxesRunTime.boxToLong(eventsByPersistenceIdSource$$anon$1.currentSequenceNr())), (tuple22, persistentRepr) -> {
                Tuple2 tuple22;
                Tuple2 tuple23 = new Tuple2(tuple22, persistentRepr);
                if (tuple23 != null) {
                    Tuple2 tuple24 = (Tuple2) tuple23.mo12002_1();
                    PersistentRepr persistentRepr = (PersistentRepr) tuple23.mo1245_2();
                    if (tuple24 != null) {
                        Seq seq2 = (Seq) tuple24.mo12002_1();
                        Option<Tuple2<Object, Object>> unapply = PersistentRepr$.MODULE$.unapply(persistentRepr);
                        if (!unapply.isEmpty()) {
                            Object mo12002_1 = unapply.get().mo12002_1();
                            long _2$mcJ$sp = unapply.get()._2$mcJ$sp();
                            if (!persistentRepr.deleted() && _2$mcJ$sp >= eventsByPersistenceIdSource$$anon$1.currentSequenceNr() && _2$mcJ$sp <= eventsByPersistenceIdSource$$anon$1.to()) {
                                tuple22 = new Tuple2(seq2.$colon$plus(new EventEnvelope(new Sequence(_2$mcJ$sp), eventsByPersistenceIdSource$$anon$1.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$persistenceId, _2$mcJ$sp, mo12002_1), Seq$.MODULE$.canBuildFrom()), BoxesRunTime.boxToLong(_2$mcJ$sp + 1));
                                return tuple22;
                            }
                        }
                    }
                }
                if (tuple23 != null) {
                    Tuple2 tuple25 = (Tuple2) tuple23.mo12002_1();
                    PersistentRepr persistentRepr2 = (PersistentRepr) tuple23.mo1245_2();
                    if (tuple25 != null) {
                        Seq seq3 = (Seq) tuple25.mo12002_1();
                        Option<Tuple2<Object, Object>> unapply2 = PersistentRepr$.MODULE$.unapply(persistentRepr2);
                        if (!unapply2.isEmpty()) {
                            tuple22 = new Tuple2(seq3, BoxesRunTime.boxToLong(unapply2.get()._2$mcJ$sp() + 1));
                            return tuple22;
                        }
                    }
                }
                throw new MatchError(tuple23);
            });
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple23 = new Tuple2((Seq) tuple2.mo12002_1(), BoxesRunTime.boxToLong(tuple2._2$mcJ$sp()));
            Seq<EventEnvelope> seq2 = (Seq) tuple23.mo12002_1();
            long _2$mcJ$sp = tuple23._2$mcJ$sp();
            eventsByPersistenceIdSource$$anon$1.currentSequenceNr_$eq(_2$mcJ$sp);
            eventsByPersistenceIdSource$$anon$1.log().debug(new StringOps("Max sequence number is now %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(_2$mcJ$sp)})));
            if (seq2.nonEmpty()) {
                eventsByPersistenceIdSource$$anon$1.buffer().enqueue(seq2);
                eventsByPersistenceIdSource$$anon$1.deliver();
                return;
            } else {
                eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(eventsByPersistenceIdSource$$anon$1.$outer.Idle());
                eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$query();
                return;
            }
        }
        if (eventsByPersistenceIdSource$$anon$1.currentSequenceNr() > eventsByPersistenceIdSource$$anon$1.to()) {
            eventsByPersistenceIdSource$$anon$1.completeStage();
            return;
        }
        boolean z = false;
        int akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state = eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state();
        if (eventsByPersistenceIdSource$$anon$1.$outer.NotifiedWhenQuerying() == akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state) {
            eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(eventsByPersistenceIdSource$$anon$1.$outer.Idle());
            eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$query();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (eventsByPersistenceIdSource$$anon$1.$outer.Querying() == akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state) {
            z = true;
            if (eventsByPersistenceIdSource$$anon$1.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$live) {
                eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(eventsByPersistenceIdSource$$anon$1.$outer.WaitingForNotification());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (z) {
            eventsByPersistenceIdSource$$anon$1.completeStage();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        eventsByPersistenceIdSource$$anon$1.log().error(new StringOps("Unexpected source state: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state())})));
        eventsByPersistenceIdSource$$anon$1.failStage(new IllegalStateException(new StringOps("Unexpected source state: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state())}))));
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$preStart$3(EventsByPersistenceIdSource$$anon$1 eventsByPersistenceIdSource$$anon$1, Message message) {
        BoxedUnit boxedUnit;
        if (message != null) {
            String channel = message.channel();
            ByteString data = message.data();
            String Channel = eventsByPersistenceIdSource$$anon$1.$outer.Channel();
            if (Channel != null ? Channel.equals(channel) : channel == null) {
                if (!eventsByPersistenceIdSource$$anon$1.$outer.Long().unapply(data).isEmpty()) {
                    eventsByPersistenceIdSource$$anon$1.log().debug("Message received");
                    int akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state = eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state();
                    if (eventsByPersistenceIdSource$$anon$1.$outer.Idle() == akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state) {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    } else if (eventsByPersistenceIdSource$$anon$1.$outer.Querying() == akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state) {
                        eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(eventsByPersistenceIdSource$$anon$1.$outer.NotifiedWhenQuerying());
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    } else if (eventsByPersistenceIdSource$$anon$1.$outer.NotifiedWhenQuerying() == akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state) {
                        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    } else {
                        if (eventsByPersistenceIdSource$$anon$1.$outer.WaitingForNotification() != akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state) {
                            throw new MatchError(BoxesRunTime.boxToInteger(akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state));
                        }
                        eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(eventsByPersistenceIdSource$$anon$1.$outer.Idle());
                        eventsByPersistenceIdSource$$anon$1.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$query();
                        BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                    }
                    BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (message == null) {
            throw new MatchError(message);
        }
        String channel2 = message.channel();
        if (eventsByPersistenceIdSource$$anon$1.log().isDebugEnabled()) {
            eventsByPersistenceIdSource$$anon$1.log().debug(new StringOps("Message from unexpected channel: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{channel2})));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$preStart$6(EventsByPersistenceIdSource$$anon$1 eventsByPersistenceIdSource$$anon$1, AsyncCallback asyncCallback, Try r6) {
        boolean z = false;
        Success success = null;
        if (r6 instanceof Success) {
            z = true;
            success = (Success) r6;
            Option option = (Option) success.value();
            if (option instanceof Some) {
                asyncCallback.invoke(BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(((Some) option).value())));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (z) {
            if (None$.MODULE$.equals((Option) success.value())) {
                eventsByPersistenceIdSource$$anon$1.getAsyncCallback(boxedUnit2 -> {
                    eventsByPersistenceIdSource$$anon$1.completeStage();
                    return BoxedUnit.UNIT;
                }).invoke(BoxedUnit.UNIT);
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                return;
            }
        }
        if (!(r6 instanceof Failure)) {
            throw new MatchError(r6);
        }
        Throwable exception = ((Failure) r6).exception();
        eventsByPersistenceIdSource$$anon$1.log().error(exception, "Error while initializing current events by persistent id");
        eventsByPersistenceIdSource$$anon$1.getAsyncCallback(boxedUnit4 -> {
            eventsByPersistenceIdSource$$anon$1.failStage(exception);
            return BoxedUnit.UNIT;
        }).invoke(BoxedUnit.UNIT);
        BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static final /* synthetic */ void $anonfun$query$1(EventsByPersistenceIdSource$$anon$1 eventsByPersistenceIdSource$$anon$1, Try r6) {
        if (r6 instanceof Success) {
            eventsByPersistenceIdSource$$anon$1.callback().invoke(((Seq) ((Success) r6).value()).map(bArr -> {
                return package$.MODULE$.persistentFromBytes(bArr, eventsByPersistenceIdSource$$anon$1.$outer.serialization());
            }, Seq$.MODULE$.canBuildFrom()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r6 instanceof Failure)) {
                throw new MatchError(r6);
            }
            eventsByPersistenceIdSource$$anon$1.log().error(((Failure) r6).exception(), "Error while querying events by persistence identifier");
            eventsByPersistenceIdSource$$anon$1.getAsyncCallback(th -> {
                eventsByPersistenceIdSource$$anon$1.failStage(th);
                return BoxedUnit.UNIT;
            });
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    /* JADX WARN: Multi-variable type inference failed */
    public EventsByPersistenceIdSource$$anon$1(EventsByPersistenceIdSource eventsByPersistenceIdSource) {
        super(eventsByPersistenceIdSource.shape());
        if (eventsByPersistenceIdSource == null) {
            throw null;
        }
        this.$outer = eventsByPersistenceIdSource;
        this.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state = eventsByPersistenceIdSource.Idle();
        this.buffer = (Queue) Queue$.MODULE$.empty();
        this.subscription = null;
        this.max = eventsByPersistenceIdSource.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$conf.getInt("max");
        this.currentSequenceNr = eventsByPersistenceIdSource.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$from;
        this.to = eventsByPersistenceIdSource.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$to;
        this.callback = null;
        this.StringSeq = scala.reflect.package$.MODULE$.classTag(ClassTag$.MODULE$.apply(Seq.class));
        setHandler(eventsByPersistenceIdSource.out(), new OutHandler(this) { // from class: akka.persistence.query.journal.redis.EventsByPersistenceIdSource$$anon$1$$anon$2
            private final /* synthetic */ EventsByPersistenceIdSource$$anon$1 $outer;

            @Override // akka.stream.stage.OutHandler
            public void onDownstreamFinish() throws Exception {
                onDownstreamFinish();
            }

            @Override // akka.stream.stage.OutHandler
            public void onPull() {
                if (this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$$outer().Initializing() == this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state()) {
                    this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$state_$eq(this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$$outer().QueryWhenInitializing());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    this.$outer.akka$persistence$query$journal$redis$EventsByPersistenceIdSource$$anon$$query();
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
