package akka.persistence.query.journal.redis;

import akka.persistence.redis.RedisKeys$;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogicWithLogging;
import akka.stream.stage.OutHandler;
import akka.util.ByteString;
import redis.ByteStringDeserializer$;
import redis.Cursor;
import redis.RedisPubSub;
import redis.RedisPubSub$;
import redis.api.pubsub.Message;
import redis.api.pubsub.PMessage;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.ExecutionContextExecutor;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.package$;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: PersistenceIdsSource.scala */
/* loaded from: input_file:BOOT-INF/lib/akka-persistence-redis_2.12-0.4.2.jar:akka/persistence/query/journal/redis/PersistenceIdsSource$$anon$1.class */
public final class PersistenceIdsSource$$anon$1 extends GraphStageLogicWithLogging {
    private boolean akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$start;
    private int akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index;
    private Queue<String> akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer;
    private boolean akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting;
    private RedisPubSub subscription;
    private final ClassTag<Seq<String>> akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$StringSeq;
    private final /* synthetic */ PersistenceIdsSource $outer;

    public boolean akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$start() {
        return this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$start;
    }

    public void akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$start_$eq(boolean z) {
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$start = z;
    }

    public int akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index() {
        return this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index;
    }

    public void akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index_$eq(int i) {
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index = i;
    }

    public Queue<String> akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer() {
        return this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer;
    }

    private void akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer_$eq(Queue<String> queue) {
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer = queue;
    }

    private boolean akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting() {
        return this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting;
    }

    public void akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting_$eq(boolean z) {
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting = z;
    }

    private RedisPubSub subscription() {
        return this.subscription;
    }

    private void subscription_$eq(RedisPubSub redisPubSub) {
        this.subscription = redisPubSub;
    }

    public ExecutionContextExecutor akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$ec() {
        return materializer().executionContext();
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void preStart() {
        AsyncCallback asyncCallback = getAsyncCallback(message -> {
            $anonfun$preStart$1(this, message);
            return BoxedUnit.UNIT;
        });
        String host = this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$redis.host();
        int port = this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$redis.port();
        Seq<String> seq = (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{RedisKeys$.MODULE$.identifiersChannel()}));
        Nil$ nil$ = Nil$.MODULE$;
        Option<String> password = this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$redis.password();
        Function1<Message, BoxedUnit> function1 = message2 -> {
            asyncCallback.invoke(message2);
            return BoxedUnit.UNIT;
        };
        Function1<PMessage, BoxedUnit> apply$default$6 = RedisPubSub$.MODULE$.apply$default$6();
        String apply$default$8 = RedisPubSub$.MODULE$.apply$default$8();
        subscription_$eq(new RedisPubSub(host, port, seq, nil$, function1, apply$default$6, password, apply$default$8, this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$system, RedisPubSub$.MODULE$.apply$default$10(host, port, seq, nil$, function1, apply$default$6, password, apply$default$8)));
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void postStop() {
        if (subscription() != null) {
            subscription().stop();
        }
    }

    public ClassTag<Seq<String>> akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$StringSeq() {
        return this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$StringSeq;
    }

    public void akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$deliver() {
        if (akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting()) {
            akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting_$eq(false);
            push(this.$outer.out(), akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer().dequeue());
        }
    }

    public /* synthetic */ PersistenceIdsSource akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ void $anonfun$preStart$1(PersistenceIdsSource$$anon$1 persistenceIdsSource$$anon$1, Message message) {
        BoxedUnit boxedUnit;
        if (message != null) {
            String channel = message.channel();
            ByteString data = message.data();
            String identifiersChannel = RedisKeys$.MODULE$.identifiersChannel();
            if (identifiersChannel != null ? identifiersChannel.equals(channel) : channel == null) {
                persistenceIdsSource$$anon$1.log().debug("Message received");
                persistenceIdsSource$$anon$1.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer().enqueue(Predef$.MODULE$.wrapRefArray(new String[]{data.utf8String()}));
                persistenceIdsSource$$anon$1.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$deliver();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (message == null) {
            throw new MatchError(message);
        }
        String channel2 = message.channel();
        if (persistenceIdsSource$$anon$1.log().isDebugEnabled()) {
            persistenceIdsSource$$anon$1.log().debug(new StringOps("Message from unexpected channel: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{channel2})));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    /* JADX WARN: Multi-variable type inference failed */
    public PersistenceIdsSource$$anon$1(PersistenceIdsSource persistenceIdsSource) {
        super(persistenceIdsSource.shape());
        if (persistenceIdsSource == null) {
            throw null;
        }
        this.$outer = persistenceIdsSource;
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$start = true;
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index = 0;
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer = (Queue) Queue$.MODULE$.empty();
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting = false;
        this.subscription = null;
        this.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$StringSeq = package$.MODULE$.classTag(ClassTag$.MODULE$.apply(Seq.class));
        setHandler(persistenceIdsSource.out(), new OutHandler(this) { // from class: akka.persistence.query.journal.redis.PersistenceIdsSource$$anon$1$$anon$2
            private final /* synthetic */ PersistenceIdsSource$$anon$1 $outer;

            @Override // akka.stream.stage.OutHandler
            public void onDownstreamFinish() throws Exception {
                onDownstreamFinish();
            }

            @Override // akka.stream.stage.OutHandler
            public void onPull() {
                this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$downstreamWaiting_$eq(true);
                if (this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer().isEmpty() && (this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$start() || this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index() > 0)) {
                    AsyncCallback asyncCallback = this.$outer.getAsyncCallback(cursor -> {
                        $anonfun$onPull$1(this, cursor);
                        return BoxedUnit.UNIT;
                    });
                    this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$$outer().akka$persistence$query$journal$redis$PersistenceIdsSource$$redis.sscan(RedisKeys$.MODULE$.identifiersKey(), this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index(), this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$$outer().akka$persistence$query$journal$redis$PersistenceIdsSource$$redis.sscan$default$3(), this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$$outer().akka$persistence$query$journal$redis$PersistenceIdsSource$$redis.sscan$default$4(), ByteStringDeserializer$.MODULE$.String()).onComplete(r6 -> {
                        $anonfun$onPull$2(this, asyncCallback, r6);
                        return BoxedUnit.UNIT;
                    }, this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$ec());
                } else {
                    if (this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer().isEmpty()) {
                        return;
                    }
                    this.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$deliver();
                }
            }

            public static final /* synthetic */ void $anonfun$onPull$1(PersistenceIdsSource$$anon$1$$anon$2 persistenceIdsSource$$anon$1$$anon$2, Cursor cursor) {
                if (cursor != null) {
                    int index = cursor.index();
                    Option<Seq<String>> unapply = persistenceIdsSource$$anon$1$$anon$2.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$StringSeq().unapply((Seq) cursor.data());
                    if (!unapply.isEmpty()) {
                        Seq<String> seq = unapply.get();
                        persistenceIdsSource$$anon$1$$anon$2.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$index_$eq(index);
                        persistenceIdsSource$$anon$1$$anon$2.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$start_$eq(false);
                        persistenceIdsSource$$anon$1$$anon$2.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$buffer().mo14065$plus$plus$eq(seq);
                        persistenceIdsSource$$anon$1$$anon$2.$outer.akka$persistence$query$journal$redis$PersistenceIdsSource$$anon$$deliver();
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        return;
                    }
                }
                throw new MatchError(cursor);
            }

            public static final /* synthetic */ void $anonfun$onPull$3(PersistenceIdsSource$$anon$1$$anon$2 persistenceIdsSource$$anon$1$$anon$2, Throwable th, BoxedUnit boxedUnit) {
                persistenceIdsSource$$anon$1$$anon$2.$outer.failStage(th);
            }

            public static final /* synthetic */ void $anonfun$onPull$2(PersistenceIdsSource$$anon$1$$anon$2 persistenceIdsSource$$anon$1$$anon$2, AsyncCallback asyncCallback, Try r6) {
                if (r6 instanceof Success) {
                    asyncCallback.invoke((Cursor) ((Success) r6).value());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    if (!(r6 instanceof Failure)) {
                        throw new MatchError(r6);
                    }
                    Throwable exception = ((Failure) r6).exception();
                    persistenceIdsSource$$anon$1$$anon$2.$outer.log().error(exception, "Error while querying persistence identifiers");
                    persistenceIdsSource$$anon$1$$anon$2.$outer.getAsyncCallback(boxedUnit2 -> {
                        $anonfun$onPull$3(persistenceIdsSource$$anon$1$$anon$2, exception, boxedUnit2);
                        return BoxedUnit.UNIT;
                    }).invoke(BoxedUnit.UNIT);
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
