package org.apache.pekko.stream.connectors.geode.impl.stage;

import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.util.CqListenerAdapter;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.StageLogging;
import scala.None$;
import scala.Option;
import scala.Some$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: GeodeCQueryGraphLogic.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/geode/impl/stage/GeodeCQueryGraphLogic.class */
public abstract class GeodeCQueryGraphLogic<V> extends GeodeSourceStageLogic<V> implements StageLogging {
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final SourceShape shape;
    private final ClientCache clientCache;
    private final String queryName;
    private final String sql;
    private final Queue<V> incomingQueue;
    private final Semaphore semaphore;
    private CqQuery query;
    private volatile boolean upstreamTerminated;
    private final AsyncCallback inFinish;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public GeodeCQueryGraphLogic(SourceShape<V> sourceShape, ClientCache clientCache, String str, String str2) {
        super(sourceShape, clientCache);
        this.shape = sourceShape;
        this.clientCache = clientCache;
        this.queryName = str;
        this.sql = str2;
        StageLogging.$init$(this);
        this.incomingQueue = (Queue) Queue$.MODULE$.apply(ScalaRunTime$.MODULE$.genericWrapArray(new Object[0]));
        this.semaphore = new Semaphore(10);
        this.upstreamTerminated = false;
        this.inFinish = getAsyncCallback(boxedUnit -> {
            this.upstreamTerminated = true;
            handleTerminaison();
        });
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public SourceShape<V> shape() {
        return this.shape;
    }

    public ClientCache clientCache() {
        return this.clientCache;
    }

    public String queryName() {
        return this.queryName;
    }

    public String sql() {
        return this.sql;
    }

    public abstract AsyncCallback<V> onElement();

    @Override // org.apache.pekko.stream.connectors.geode.impl.stage.GeodeSourceStageLogic
    public Try<Iterator<V>> executeQuery() {
        return Try$.MODULE$.apply(this::executeQuery$$anonfun$1);
    }

    private Iterator<V> buildInitialResulsIterator(CqQuery cqQuery) {
        final Iterator it = cqQuery.executeWithInitialResults().iterator();
        return new Iterator<V>(it) { // from class: org.apache.pekko.stream.connectors.geode.impl.stage.GeodeCQueryGraphLogic$$anon$2
            private final Iterator it$1;

            {
                this.it$1 = it;
            }

            @Override // java.util.Iterator
            public /* bridge */ /* synthetic */ void remove() {
                super.remove();
            }

            @Override // java.util.Iterator
            public /* bridge */ /* synthetic */ void forEachRemaining(Consumer consumer) {
                super.forEachRemaining(consumer);
            }

            @Override // java.util.Iterator
            public Object next() {
                return ((Struct) this.it$1.next()).getFieldValues()[1];
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                return this.it$1.hasNext();
            }
        };
    }

    public void onGeodeElement(V v) {
        this.semaphore.acquire();
        onElement().invoke(v);
    }

    public boolean incomingQueueIsEmpty() {
        return this.incomingQueue.isEmpty();
    }

    public void enqueue(V v) {
        this.incomingQueue.enqueue(v);
    }

    public Option<V> dequeue() {
        return this.incomingQueue.isEmpty() ? None$.MODULE$ : Some$.MODULE$.apply(this.incomingQueue.dequeue());
    }

    public void pushElement(Outlet<V> outlet, V v) {
        push(outlet, v);
        this.semaphore.release();
    }

    public void postStop() {
        if (clientCache().isClosed()) {
            return;
        }
        qs().closeCqs();
    }

    public AsyncCallback<BoxedUnit> inFinish() {
        return this.inFinish;
    }

    public void handleTerminaison() {
        if (this.upstreamTerminated && this.incomingQueue.isEmpty()) {
            completeStage();
        }
    }

    private final Iterator executeQuery$$anonfun$1() {
        CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
        cqAttributesFactory.addCqListener(new CqListenerAdapter(this) { // from class: org.apache.pekko.stream.connectors.geode.impl.stage.GeodeCQueryGraphLogic$$anon$1
            private final /* synthetic */ GeodeCQueryGraphLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public void onEvent(CqEvent cqEvent) {
                this.$outer.onGeodeElement(cqEvent.getNewValue());
            }

            public void onError(CqEvent cqEvent) {
                this.$outer.log().error(cqEvent.getThrowable(), String.valueOf(cqEvent));
            }

            public void close() {
                this.$outer.log().debug("closes");
                this.$outer.inFinish().invoke(BoxedUnit.UNIT);
            }
        });
        this.query = qs().newCq(queryName(), sql(), cqAttributesFactory.create());
        return buildInitialResulsIterator(this.query);
    }
}
