package akka.persistence.query.journal.redis;

import akka.persistence.PersistentRepr;
import akka.persistence.PersistentRepr$;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.EventEnvelope$;
import akka.persistence.query.Sequence;
import akka.persistence.redis.RedisKeys$;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogicWithLogging;
import akka.stream.stage.OutHandler;
import akka.util.ByteString;
import redis.ByteStringDeserializer$;
import redis.RedisPubSub;
import redis.RedisPubSub$;
import redis.api.Limit;
import redis.api.Limit$;
import redis.api.pubsub.Message;
import redis.api.pubsub.PMessage;
import redis.commands.TransactionBuilder;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.math.package$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: EventsByTagSource.scala */
/* loaded from: input_file:BOOT-INF/lib/akka-persistence-redis_2.12-0.4.2.jar:akka/persistence/query/journal/redis/EventsByTagSource$$anon$1.class */
public final class EventsByTagSource$$anon$1 extends GraphStageLogicWithLogging {
    private int akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state;
    private Queue<EventEnvelope> buffer;
    private RedisPubSub subscription;
    private final int max;
    private long currentOffset;
    private long maxOffset;
    private AsyncCallback<Tuple2<Object, Seq<Tuple2<String, Option<PersistentRepr>>>>> callback;
    private final ClassTag<Seq<String>> StringSeq;
    private final /* synthetic */ EventsByTagSource $outer;

    public int akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state() {
        return this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state;
    }

    public void akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(int i) {
        this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state = i;
    }

    private Queue<EventEnvelope> buffer() {
        return this.buffer;
    }

    private void buffer_$eq(Queue<EventEnvelope> queue) {
        this.buffer = queue;
    }

    private RedisPubSub subscription() {
        return this.subscription;
    }

    private void subscription_$eq(RedisPubSub redisPubSub) {
        this.subscription = redisPubSub;
    }

    private int max() {
        return this.max;
    }

    private long currentOffset() {
        return this.currentOffset;
    }

    private void currentOffset_$eq(long j) {
        this.currentOffset = j;
    }

    private long maxOffset() {
        return this.maxOffset;
    }

    private void maxOffset_$eq(long j) {
        this.maxOffset = j;
    }

    private AsyncCallback<Tuple2<Object, Seq<Tuple2<String, Option<PersistentRepr>>>>> callback() {
        return this.callback;
    }

    private void callback_$eq(AsyncCallback<Tuple2<Object, Seq<Tuple2<String, Option<PersistentRepr>>>>> asyncCallback) {
        this.callback = asyncCallback;
    }

    private ExecutionContextExecutor ec() {
        return materializer().executionContext();
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void preStart() {
        callback_$eq(getAsyncCallback(tuple2 -> {
            $anonfun$preStart$1(this, tuple2);
            return BoxedUnit.UNIT;
        }));
        if (!this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$live) {
            akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(this.$outer.Initializing());
            AsyncCallback asyncCallback = getAsyncCallback(j -> {
                this.maxOffset_$eq(j);
                int akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state = this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state();
                if (this.$outer.QueryWhenInitializing() == akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state) {
                    this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(this.$outer.Idle());
                    this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$query();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    if (this.$outer.Initializing() == akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state) {
                        this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(this.$outer.Idle());
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                        return;
                    }
                    this.log().error(new StringOps("Unexpected source state when initializing: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state())})));
                    this.failStage(new IllegalStateException(new StringOps("Unexpected source state when initializing: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state())}))));
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                }
            });
            this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$redis.llen(RedisKeys$.MODULE$.tagKey(this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$tag)).onComplete(r6 -> {
                $anonfun$preStart$6(this, asyncCallback, r6);
                return BoxedUnit.UNIT;
            }, ec());
            return;
        }
        AsyncCallback asyncCallback2 = getAsyncCallback(message -> {
            $anonfun$preStart$3(this, message);
            return BoxedUnit.UNIT;
        });
        String host = this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$redis.host();
        int port = this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$redis.port();
        Seq<String> seq = (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{RedisKeys$.MODULE$.tagsChannel()}));
        Nil$ nil$ = Nil$.MODULE$;
        Option<String> password = this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$redis.password();
        Function1<Message, BoxedUnit> function1 = message2 -> {
            asyncCallback2.invoke(message2);
            return BoxedUnit.UNIT;
        };
        Function1<PMessage, BoxedUnit> apply$default$6 = RedisPubSub$.MODULE$.apply$default$6();
        String apply$default$8 = RedisPubSub$.MODULE$.apply$default$8();
        subscription_$eq(new RedisPubSub(host, port, seq, nil$, function1, apply$default$6, password, apply$default$8, this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$system, RedisPubSub$.MODULE$.apply$default$10(host, port, seq, nil$, function1, apply$default$6, password, apply$default$8)));
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void postStop() {
        if (subscription() != null) {
            subscription().stop();
        }
    }

    private ClassTag<Seq<String>> StringSeq() {
        return this.StringSeq;
    }

    public void akka$persistence$query$journal$redis$EventsByTagSource$$anon$$query() {
        BoxedUnit boxedUnit;
        if (this.$outer.Idle() == akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state()) {
            if (buffer().isEmpty()) {
                akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(this.$outer.Querying());
                this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$redis.lrange(RedisKeys$.MODULE$.tagKey(this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$tag), currentOffset(), package$.MODULE$.min(maxOffset(), (currentOffset() + max()) - 1), this.$outer.eventRefDeserializer()).map(seq -> {
                    TransactionBuilder transaction = this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$redis.transaction();
                    return new Tuple3(seq, transaction, Future$.MODULE$.sequence((TraversableOnce) seq.map(eventRef -> {
                        if (eventRef == null) {
                            throw new MatchError(eventRef);
                        }
                        long sequenceNr = eventRef.sequenceNr();
                        String persistenceId = eventRef.persistenceId();
                        return transaction.zrangebyscore(RedisKeys$.MODULE$.journalKey(persistenceId), new Limit(sequenceNr, Limit$.MODULE$.apply$default$2()), new Limit(sequenceNr, Limit$.MODULE$.apply$default$2()), transaction.zrangebyscore$default$4(), ByteStringDeserializer$.MODULE$.ByteArray()).map(seq -> {
                            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(persistenceId), seq);
                        }, this.ec());
                    }, Seq$.MODULE$.canBuildFrom()), Seq$.MODULE$.canBuildFrom(), this.ec()));
                }, ec()).flatMap(tuple3 -> {
                    if (tuple3 == null) {
                        throw new MatchError(tuple3);
                    }
                    Seq seq2 = (Seq) tuple3._1();
                    TransactionBuilder transactionBuilder = (TransactionBuilder) tuple3._2();
                    Future future = (Future) tuple3._3();
                    return transactionBuilder.exec().flatMap(multiBulk -> {
                        return future.map(seq3 -> {
                            return new Tuple2(BoxesRunTime.boxToInteger(seq2.size()), seq3.map(tuple2 -> {
                                if (tuple2 != null) {
                                    return new Tuple2((String) tuple2.mo13858_1(), ((Seq) tuple2.mo1304_2()).headOption().map(bArr -> {
                                        return akka.persistence.redis.package$.MODULE$.persistentFromBytes(bArr, this.$outer.serialization());
                                    }));
                                }
                                throw new MatchError(tuple2);
                            }, Seq$.MODULE$.canBuildFrom()));
                        }, this.ec());
                    }, this.ec());
                }, ec()).onComplete(r4 -> {
                    $anonfun$query$9(this, r4);
                    return BoxedUnit.UNIT;
                }, ec());
                boxedUnit = BoxedUnit.UNIT;
            } else {
                deliver();
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
        log().error(new StringOps("Unexpected source state when querying: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state())})));
        failStage(new IllegalStateException(new StringOps("Unexpected source state when querying: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state())}))));
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    private void deliver() {
        akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(this.$outer.Idle());
        push(this.$outer.out(), buffer().dequeue());
        if (!buffer().isEmpty() || currentOffset() < maxOffset()) {
            return;
        }
        completeStage();
    }

    public /* synthetic */ EventsByTagSource akka$persistence$query$journal$redis$EventsByTagSource$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ void $anonfun$preStart$1(EventsByTagSource$$anon$1 eventsByTagSource$$anon$1, Tuple2 tuple2) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        int _1$mcI$sp = tuple2._1$mcI$sp();
        Seq seq = (Seq) tuple2.mo1304_2();
        if (!seq.isEmpty()) {
            Seq seq2 = (Seq) ((TraversableLike) seq.zipWithIndex(Seq$.MODULE$.canBuildFrom())).flatMap(tuple22 -> {
                Iterable option2Iterable;
                if (tuple22 != null) {
                    Tuple2 tuple22 = (Tuple2) tuple22.mo13858_1();
                    int _2$mcI$sp = tuple22._2$mcI$sp();
                    if (tuple22 != null) {
                        String str = (String) tuple22.mo13858_1();
                        Option option = (Option) tuple22.mo1304_2();
                        if (option instanceof Some) {
                            PersistentRepr persistentRepr = (PersistentRepr) ((Some) option).value();
                            Option<Tuple2<Object, Object>> unapply = PersistentRepr$.MODULE$.unapply(persistentRepr);
                            if (!unapply.isEmpty()) {
                                Object mo13858_1 = unapply.get().mo13858_1();
                                long _2$mcJ$sp = unapply.get()._2$mcJ$sp();
                                if (!persistentRepr.deleted()) {
                                    option2Iterable = Option$.MODULE$.option2Iterable(new Some(EventEnvelope$.MODULE$.apply(new Sequence(eventsByTagSource$$anon$1.currentOffset() + _2$mcI$sp), str, _2$mcJ$sp, mo13858_1)));
                                    return option2Iterable;
                                }
                            }
                        }
                    }
                }
                if (tuple22 == null || ((Tuple2) tuple22.mo13858_1()) == null) {
                    throw new MatchError(tuple22);
                }
                option2Iterable = Option$.MODULE$.option2Iterable(None$.MODULE$);
                return option2Iterable;
            }, Seq$.MODULE$.canBuildFrom());
            eventsByTagSource$$anon$1.currentOffset_$eq(eventsByTagSource$$anon$1.currentOffset() + _1$mcI$sp);
            if (seq2.nonEmpty()) {
                eventsByTagSource$$anon$1.buffer().mo14065$plus$plus$eq(seq2);
                eventsByTagSource$$anon$1.deliver();
                boxedUnit = BoxedUnit.UNIT;
            } else {
                eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(eventsByTagSource$$anon$1.$outer.Idle());
                eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$query();
                boxedUnit = BoxedUnit.UNIT;
            }
        } else if (eventsByTagSource$$anon$1.currentOffset() >= eventsByTagSource$$anon$1.maxOffset()) {
            eventsByTagSource$$anon$1.completeStage();
            boxedUnit = BoxedUnit.UNIT;
        } else {
            int akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state = eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state();
            if (eventsByTagSource$$anon$1.$outer.NotifiedWhenQuerying() == akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state) {
                eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(eventsByTagSource$$anon$1.$outer.Idle());
                eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$query();
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else if (eventsByTagSource$$anon$1.$outer.Querying() != akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state) {
                eventsByTagSource$$anon$1.log().error(new StringOps("Unexpected source state: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state())})));
                eventsByTagSource$$anon$1.failStage(new IllegalStateException(new StringOps("Unexpected source state: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state())}))));
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            } else if (eventsByTagSource$$anon$1.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$live) {
                eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(eventsByTagSource$$anon$1.$outer.WaitingForNotification());
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                eventsByTagSource$$anon$1.completeStage();
                boxedUnit2 = BoxedUnit.UNIT;
            }
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$preStart$3(EventsByTagSource$$anon$1 eventsByTagSource$$anon$1, Message message) {
        BoxedUnit boxedUnit;
        if (message != null) {
            String channel = message.channel();
            ByteString data = message.data();
            String tagsChannel = RedisKeys$.MODULE$.tagsChannel();
            if (tagsChannel != null ? tagsChannel.equals(channel) : channel == null) {
                ByteString Tag = eventsByTagSource$$anon$1.$outer.Tag();
                if (Tag != null ? Tag.equals(data) : data == null) {
                    eventsByTagSource$$anon$1.log().debug("Message received");
                    int akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state = eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state();
                    if (eventsByTagSource$$anon$1.$outer.Idle() == akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state) {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    } else if (eventsByTagSource$$anon$1.$outer.Querying() == akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state) {
                        eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(eventsByTagSource$$anon$1.$outer.NotifiedWhenQuerying());
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    } else if (eventsByTagSource$$anon$1.$outer.NotifiedWhenQuerying() == akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state) {
                        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    } else {
                        if (eventsByTagSource$$anon$1.$outer.WaitingForNotification() != akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state) {
                            throw new MatchError(BoxesRunTime.boxToInteger(akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state));
                        }
                        eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(eventsByTagSource$$anon$1.$outer.Idle());
                        eventsByTagSource$$anon$1.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$query();
                        BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                    }
                    BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (message != null) {
            String channel2 = message.channel();
            String tagsChannel2 = RedisKeys$.MODULE$.tagsChannel();
            if (tagsChannel2 != null ? tagsChannel2.equals(channel2) : channel2 == null) {
                BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
                return;
            }
        }
        if (message == null) {
            throw new MatchError(message);
        }
        String channel3 = message.channel();
        if (eventsByTagSource$$anon$1.log().isDebugEnabled()) {
            eventsByTagSource$$anon$1.log().debug(new StringOps("Message from unexpected channel: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{channel3})));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$preStart$6(EventsByTagSource$$anon$1 eventsByTagSource$$anon$1, AsyncCallback asyncCallback, Try r8) {
        if (r8 instanceof Success) {
            asyncCallback.invoke(BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(((Success) r8).value()) - 1));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r8 instanceof Failure)) {
                throw new MatchError(r8);
            }
            Throwable exception = ((Failure) r8).exception();
            eventsByTagSource$$anon$1.log().error(exception, "Error while initializing current events by tag");
            eventsByTagSource$$anon$1.getAsyncCallback(boxedUnit2 -> {
                eventsByTagSource$$anon$1.failStage(exception);
                return BoxedUnit.UNIT;
            }).invoke(BoxedUnit.UNIT);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$query$9(EventsByTagSource$$anon$1 eventsByTagSource$$anon$1, Try r7) {
        Tuple2 tuple2;
        if ((r7 instanceof Success) && (tuple2 = (Tuple2) ((Success) r7).value()) != null) {
            eventsByTagSource$$anon$1.callback().invoke(new Tuple2<>(BoxesRunTime.boxToInteger(tuple2._1$mcI$sp()), (Seq) tuple2.mo1304_2()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r7 instanceof Failure)) {
                throw new MatchError(r7);
            }
            Throwable exception = ((Failure) r7).exception();
            eventsByTagSource$$anon$1.log().error(exception, "Error while querying events by persistence identifier");
            eventsByTagSource$$anon$1.getAsyncCallback(boxedUnit2 -> {
                eventsByTagSource$$anon$1.failStage(exception);
                return BoxedUnit.UNIT;
            }).invoke(BoxedUnit.UNIT);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    /* JADX WARN: Multi-variable type inference failed */
    public EventsByTagSource$$anon$1(EventsByTagSource eventsByTagSource) {
        super(eventsByTagSource.shape());
        if (eventsByTagSource == null) {
            throw null;
        }
        this.$outer = eventsByTagSource;
        this.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state = eventsByTagSource.Idle();
        this.buffer = (Queue) Queue$.MODULE$.empty();
        this.subscription = null;
        this.max = eventsByTagSource.akka$persistence$query$journal$redis$EventsByTagSource$$conf.getInt("max");
        this.currentOffset = eventsByTagSource.akka$persistence$query$journal$redis$EventsByTagSource$$offset;
        this.maxOffset = Long.MAX_VALUE;
        this.callback = null;
        this.StringSeq = scala.reflect.package$.MODULE$.classTag(ClassTag$.MODULE$.apply(Seq.class));
        setHandler(eventsByTagSource.out(), new OutHandler(this) { // from class: akka.persistence.query.journal.redis.EventsByTagSource$$anon$1$$anon$2
            private final /* synthetic */ EventsByTagSource$$anon$1 $outer;

            @Override // akka.stream.stage.OutHandler
            public void onDownstreamFinish() throws Exception {
                onDownstreamFinish();
            }

            @Override // akka.stream.stage.OutHandler
            public void onPull() {
                if (this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$$outer().Initializing() == this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state()) {
                    this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$state_$eq(this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$$outer().QueryWhenInitializing());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    this.$outer.akka$persistence$query$journal$redis$EventsByTagSource$$anon$$query();
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
