package org.apache.pekko.stream.connectors.pravega.impl;

import io.pravega.client.KeyValueTableFactory;
import io.pravega.client.tables.IteratorItem;
import io.pravega.client.tables.KeyValueTable;
import io.pravega.client.tables.KeyValueTableClientConfiguration;
import io.pravega.common.util.AsyncIterator;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.connectors.pravega.TableEntry;
import org.apache.pekko.stream.connectors.pravega.TableReaderSettings;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.Option;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: PravegaTableSource.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/pravega/impl/PravegaTableSourceStageLogic.class */
public final class PravegaTableSourceStageLogic<K, V> extends GraphStageLogic implements StageLogging {
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final SourceShape<TableEntry<V>> shape;
    private final String scope;
    private final String tableName;
    public final TableReaderSettings<K, V> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings;
    private final Promise<Done> startupPromise;
    private KeyValueTableFactory keyValueTableFactory;
    private KeyValueTable table;
    public final Queue<TableEntry<V>> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue;
    public final Semaphore org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore;
    public boolean org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing;
    private final AsyncCallback logThat;
    private final AsyncCallback onElement;
    private final AsyncCallback onFinish;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PravegaTableSourceStageLogic(SourceShape<TableEntry<V>> sourceShape, String str, String str2, TableReaderSettings<K, V> tableReaderSettings, Promise<Done> promise) {
        super(sourceShape);
        this.shape = sourceShape;
        this.scope = str;
        this.tableName = str2;
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings = tableReaderSettings;
        this.startupPromise = promise;
        StageLogging.$init$(this);
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue = Queue$.MODULE$.empty();
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore = new Semaphore(tableReaderSettings.maximumInflightMessages());
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing = false;
        this.logThat = getAsyncCallback(str3 -> {
            log().info(str3);
        });
        this.onElement = getAsyncCallback(tableEntry -> {
            if (isAvailable(org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out()) && this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue.isEmpty()) {
                org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$pushElement(org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out(), tableEntry);
            } else {
                this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue.enqueue(tableEntry);
            }
        });
        this.onFinish = getAsyncCallback(boxedUnit -> {
            this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing = true;
            if (this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue.isEmpty()) {
                completeStage();
            }
        });
        setHandler(org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out(), new OutHandler(this) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaTableSourceStageLogic$$anon$1
            private final /* synthetic */ PravegaTableSourceStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                if (!this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue.isEmpty()) {
                    this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$pushElement(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out(), (TableEntry) this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue.dequeue());
                }
                if (this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing && this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue.isEmpty()) {
                    this.$outer.completeStage();
                }
            }
        });
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public String scope() {
        return this.scope;
    }

    public Class<?> logSource() {
        return PravegaTableSourceStageLogic.class;
    }

    public Outlet<TableEntry<V>> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out() {
        return this.shape.out();
    }

    public AsyncCallback<String> logThat() {
        return this.logThat;
    }

    public void org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$pushElement(Outlet<TableEntry<V>> outlet, TableEntry<V> tableEntry) {
        push(outlet, tableEntry);
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore.release();
    }

    public AsyncCallback<TableEntry<V>> onElement() {
        return this.onElement;
    }

    public AsyncCallback<BoxedUnit> onFinish() {
        return this.onFinish;
    }

    public void nextIteration(final AsyncIterator<IteratorItem<io.pravega.client.tables.TableEntry>> asyncIterator) {
        asyncIterator.getNext().thenAccept((Consumer) new Consumer<IteratorItem<io.pravega.client.tables.TableEntry>>(asyncIterator, this) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaTableSourceStageLogic$$anon$2
            private final AsyncIterator iterator$1;
            private final /* synthetic */ PravegaTableSourceStageLogic $outer;

            {
                this.iterator$1 = asyncIterator;
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            @Override // java.util.function.Consumer
            public /* bridge */ /* synthetic */ Consumer<IteratorItem<io.pravega.client.tables.TableEntry>> andThen(Consumer<? super IteratorItem<io.pravega.client.tables.TableEntry>> consumer) {
                return super.andThen(consumer);
            }

            /* renamed from: accept, reason: avoid collision after fix types in other method */
            public void accept2(IteratorItem iteratorItem) {
                if (iteratorItem == null) {
                    this.$outer.onFinish().invoke(BoxedUnit.UNIT);
                } else {
                    iteratorItem.getItems().stream().forEach(tableEntry -> {
                        this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore.acquire();
                        this.$outer.onElement().invoke(new TableEntry(tableEntry.getKey(), tableEntry.getVersion(), this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.valueSerializer().deserialize(tableEntry.getValue())));
                    });
                    this.$outer.nextIteration(this.iterator$1);
                }
            }

            @Override // java.util.function.Consumer
            public /* bridge */ /* synthetic */ void accept(IteratorItem<io.pravega.client.tables.TableEntry> iteratorItem) {
                accept2((IteratorItem) iteratorItem);
            }
        });
    }

    public void preStart() {
        log().debug("Start consuming {} by {} ...", this.tableName, BoxesRunTime.boxToInteger(this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.maximumInflightMessages()));
        try {
            KeyValueTableClientConfiguration build = KeyValueTableClientConfiguration.builder().build();
            this.keyValueTableFactory = KeyValueTableFactory.withScope(scope(), this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.clientConfig());
            this.table = this.keyValueTableFactory.forKeyValueTable(this.tableName, build);
            nextIteration(this.table.iterator().maxIterationSize(this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.maxEntriesAtOnce()).all().entries());
            this.startupPromise.success(Done$.MODULE$);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    Throwable th2 = (Throwable) unapply.get();
                    log().error(th2.getMessage());
                    failStage(th2);
                    return;
                }
            }
            throw th;
        }
    }

    public void postStop() {
        log().debug("Stopping reader {}", this.tableName);
        this.table.close();
        this.keyValueTableFactory.close();
    }
}
