package org.apache.pekko.stream.connectors.pravega.impl;

import io.pravega.client.KeyValueTableFactory;
import io.pravega.client.tables.IteratorItem;
import io.pravega.client.tables.KeyValueTable;
import io.pravega.client.tables.KeyValueTableClientConfiguration;
import io.pravega.common.util.AsyncIterator;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.connectors.pravega.TableEntry;
import org.apache.pekko.stream.connectors.pravega.TableReaderSettings;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.Option;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: PravegaTableSource.scala */
@InternalApi
@ScalaSignature(bytes = "\u0006\u0005\u00055h\u0001B\u0011#\rEB\u0001\u0002\u0010\u0001\u0003\u0002\u0003\u0006I!\u0010\u0005\t'\u0002\u0011)\u0019!C\u0001)\"A\u0001\r\u0001B\u0001B\u0003%Q\u000b\u0003\u0005b\u0001\t\u0005\t\u0015!\u0003V\u0011!\u0011\u0007A!A!\u0002\u0013\u0019\u0007\u0002C5\u0001\u0005\u0003\u0005\u000b\u0011\u00026\t\u000bQ\u0004A\u0011A;\t\u000bu\u0004A\u0011\u000b@\t\u000f\u0005=\u0001\u0001\"\u0003\u0002\u0012!Y\u0011\u0011\u0004\u0001A\u0002\u0003\u0007I\u0011BA\u000e\u0011-\ty\u0003\u0001a\u0001\u0002\u0004%I!!\r\t\u0017\u0005u\u0002\u00011A\u0001B\u0003&\u0011Q\u0004\u0005\f\u0003\u007f\u0001\u0001\u0019!a\u0001\n\u0013\t\t\u0005C\u0006\u0002P\u0001\u0001\r\u00111A\u0005\n\u0005E\u0003bCA+\u0001\u0001\u0007\t\u0011)Q\u0005\u0003\u0007B\u0011\"a\u0016\u0001\u0005\u0004%I!!\u0017\t\u0011\u0005-\u0004\u0001)A\u0005\u00037B\u0011\"!\u001c\u0001\u0005\u0004%I!a\u001c\t\u0011\u0005}\u0004\u0001)A\u0005\u0003cB\u0011\"!!\u0001\u0001\u0004%I!a!\t\u0013\u0005-\u0005\u00011A\u0005\n\u00055\u0005\u0002CAI\u0001\u0001\u0006K!!\"\t\u0013\u0005M\u0005A1A\u0005\u0002\u0005U\u0005\u0002CAO\u0001\u0001\u0006I!a&\t\u000f\u0005}\u0005\u0001\"\u0003\u0002\"\"I\u0011\u0011\u0016\u0001C\u0002\u0013\u0005\u00111\u0016\u0005\t\u0003_\u0003\u0001\u0015!\u0003\u0002.\"I\u0011\u0011\u0017\u0001C\u0002\u0013\u0005\u00111\u0017\u0005\t\u0003o\u0003\u0001\u0015!\u0003\u00026\"9\u0011\u0011\u0018\u0001\u0005\u0002\u0005m\u0006bBAm\u0001\u0011\u0005\u00131\u001c\u0005\b\u0003;\u0004A\u0011IAn\u0005q\u0001&/\u0019<fO\u0006$\u0016M\u00197f'>,(oY3Ti\u0006<W\rT8hS\u000eT!a\t\u0013\u0002\t%l\u0007\u000f\u001c\u0006\u0003K\u0019\nq\u0001\u001d:bm\u0016<\u0017M\u0003\u0002(Q\u0005Q1m\u001c8oK\u000e$xN]:\u000b\u0005%R\u0013AB:ue\u0016\fWN\u0003\u0002,Y\u0005)\u0001/Z6l_*\u0011QFL\u0001\u0007CB\f7\r[3\u000b\u0003=\n1a\u001c:h\u0007\u0001)2AM4H'\r\u00011'\u000f\t\u0003i]j\u0011!\u000e\u0006\u0003m!\nQa\u001d;bO\u0016L!\u0001O\u001b\u0003\u001f\u001d\u0013\u0018\r\u001d5Ti\u0006<W\rT8hS\u000e\u0004\"\u0001\u000e\u001e\n\u0005m*$\u0001D*uC\u001e,Gj\\4hS:<\u0017!B:iCB,\u0007c\u0001 @\u00036\t\u0001&\u0003\u0002AQ\tY1k\\;sG\u0016\u001c\u0006.\u00199f!\r\u00115)R\u0007\u0002I%\u0011A\t\n\u0002\u000b)\u0006\u0014G.Z#oiJL\bC\u0001$H\u0019\u0001!Q\u0001\u0013\u0001C\u0002%\u0013\u0011AV\t\u0003\u0015B\u0003\"a\u0013(\u000e\u00031S\u0011!T\u0001\u0006g\u000e\fG.Y\u0005\u0003\u001f2\u0013qAT8uQ&tw\r\u0005\u0002L#&\u0011!\u000b\u0014\u0002\u0004\u0003:L\u0018!B:d_B,W#A+\u0011\u0005YkfBA,\\!\tAF*D\u0001Z\u0015\tQ\u0006'\u0001\u0004=e>|GOP\u0005\u000392\u000ba\u0001\u0015:fI\u00164\u0017B\u00010`\u0005\u0019\u0019FO]5oO*\u0011A\fT\u0001\u0007g\u000e|\u0007/\u001a\u0011\u0002\u0013Q\f'\r\\3OC6,\u0017a\u0005;bE2,'+Z1eKJ\u001cV\r\u001e;j]\u001e\u001c\b\u0003\u0002\"eM\u0016K!!\u001a\u0013\u0003'Q\u000b'\r\\3SK\u0006$WM]*fiRLgnZ:\u0011\u0005\u0019;G!\u00025\u0001\u0005\u0004I%!A&\u0002\u001dM$\u0018M\u001d;vaB\u0013x.\\5tKB\u00191N\u001c9\u000e\u00031T!!\u001c'\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002pY\n9\u0001K]8nSN,\u0007CA9s\u001b\u0005Q\u0013BA:+\u0005\u0011!uN\\3\u0002\rqJg.\u001b;?)\u00191\b0\u001f>|yB!q\u000f\u00014F\u001b\u0005\u0011\u0003\"\u0002\u001f\b\u0001\u0004i\u0004\"B*\b\u0001\u0004)\u0006\"B1\b\u0001\u0004)\u0006\"\u00022\b\u0001\u0004\u0019\u0007\"B5\b\u0001\u0004Q\u0017!\u00037pON{WO]2f+\u0005y\b#BA\u0001\u0003\u00171XBAA\u0002\u0015\u0011\t)!a\u0002\u0002\t1\fgn\u001a\u0006\u0003\u0003\u0013\tAA[1wC&!\u0011QBA\u0002\u0005\u0015\u0019E.Y:t\u0003\ryW\u000f^\u000b\u0003\u0003'\u0001BAPA\u000b\u0003&\u0019\u0011q\u0003\u0015\u0003\r=+H\u000f\\3u\u0003QYW-\u001f,bYV,G+\u00192mK\u001a\u000b7\r^8ssV\u0011\u0011Q\u0004\t\u0005\u0003?\tY#\u0004\u0002\u0002\")!\u00111EA\u0013\u0003\u0019\u0019G.[3oi*\u0019Q%a\n\u000b\u0005\u0005%\u0012AA5p\u0013\u0011\ti#!\t\u0003)-+\u0017PV1mk\u0016$\u0016M\u00197f\r\u0006\u001cGo\u001c:z\u0003aYW-\u001f,bYV,G+\u00192mK\u001a\u000b7\r^8ss~#S-\u001d\u000b\u0005\u0003g\tI\u0004E\u0002L\u0003kI1!a\u000eM\u0005\u0011)f.\u001b;\t\u0013\u0005m2\"!AA\u0002\u0005u\u0011a\u0001=%c\u0005)2.Z=WC2,X\rV1cY\u00164\u0015m\u0019;pef\u0004\u0013!\u0002;bE2,WCAA\"!\u0011\t)%a\u0013\u000e\u0005\u0005\u001d#\u0002BA%\u0003C\ta\u0001^1cY\u0016\u001c\u0018\u0002BA'\u0003\u000f\u0012QbS3z-\u0006dW/\u001a+bE2,\u0017!\u0003;bE2,w\fJ3r)\u0011\t\u0019$a\u0015\t\u0013\u0005mb\"!AA\u0002\u0005\r\u0013A\u0002;bE2,\u0007%A\u0003rk\u0016,X-\u0006\u0002\u0002\\A)\u0011QLA4\u00036\u0011\u0011q\f\u0006\u0005\u0003C\n\u0019'A\u0004nkR\f'\r\\3\u000b\u0007\u0005\u0015D*\u0001\u0006d_2dWm\u0019;j_:LA!!\u001b\u0002`\t)\u0011+^3vK\u00061\u0011/^3vK\u0002\n\u0011b]3nCBDwN]3\u0016\u0005\u0005E\u0004\u0003BA:\u0003wj!!!\u001e\u000b\u00075\f9H\u0003\u0003\u0002z\u0005\u001d\u0011\u0001B;uS2LA!! \u0002v\tI1+Z7ba\"|'/Z\u0001\u000bg\u0016l\u0017\r\u001d5pe\u0016\u0004\u0013aB2m_NLgnZ\u000b\u0003\u0003\u000b\u00032aSAD\u0013\r\tI\t\u0014\u0002\b\u0005>|G.Z1o\u0003-\u0019Gn\\:j]\u001e|F%Z9\u0015\t\u0005M\u0012q\u0012\u0005\n\u0003w)\u0012\u0011!a\u0001\u0003\u000b\u000b\u0001b\u00197pg&tw\rI\u0001\bY><G\u000b[1u+\t\t9\n\u0005\u00035\u00033+\u0016bAANk\ti\u0011i]=oG\u000e\u000bG\u000e\u001c2bG.\f\u0001\u0002\\8h)\"\fG\u000fI\u0001\faV\u001c\b.\u00127f[\u0016tG\u000f\u0006\u0004\u00024\u0005\r\u0016Q\u0015\u0005\b\u0003\u001fI\u0002\u0019AA\n\u0011\u0019\t9+\u0007a\u0001\u0003\u00069Q\r\\3nK:$\u0018!C8o\u000b2,W.\u001a8u+\t\ti\u000b\u0005\u00035\u00033\u000b\u0015AC8o\u000b2,W.\u001a8uA\u0005AqN\u001c$j]&\u001c\b.\u0006\u0002\u00026B)A'!'\u00024\u0005IqN\u001c$j]&\u001c\b\u000eI\u0001\u000e]\u0016DH/\u0013;fe\u0006$\u0018n\u001c8\u0015\t\u0005M\u0012Q\u0018\u0005\b\u0003\u007fs\u0002\u0019AAa\u0003!IG/\u001a:bi>\u0014\bCBAb\u0003\u0017\fy-\u0004\u0002\u0002F*!\u0011\u0011PAd\u0015\u0011\tI-!\n\u0002\r\r|W.\\8o\u0013\u0011\ti-!2\u0003\u001b\u0005\u001b\u0018P\\2Ji\u0016\u0014\u0018\r^8s!\u0019\t)%!5\u0002V&!\u00111[A$\u00051IE/\u001a:bi>\u0014\u0018\n^3n!\u0011\t)%a6\n\u0007\u0011\u000b9%\u0001\u0005qe\u0016\u001cF/\u0019:u)\t\t\u0019$\u0001\u0005q_N$8\u000b^8qQ\r\u0001\u0011\u0011\u001d\t\u0005\u0003G\fI/\u0004\u0002\u0002f*\u0019\u0011q\u001d\u0016\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002l\u0006\u0015(aC%oi\u0016\u0014h.\u00197Ba&\u0004")
/* loaded from: input_file:org/apache/pekko/stream/connectors/pravega/impl/PravegaTableSourceStageLogic.class */
public final class PravegaTableSourceStageLogic<K, V> extends GraphStageLogic implements StageLogging {
    private final SourceShape<TableEntry<V>> shape;
    private final String scope;
    private final String tableName;
    public final TableReaderSettings<K, V> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings;
    private final Promise<Done> startupPromise;
    private KeyValueTableFactory keyValueTableFactory;
    private KeyValueTable table;
    private final Queue<TableEntry<V>> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue;
    private final Semaphore org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore;
    private boolean org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing;
    private final AsyncCallback<String> logThat;
    private final AsyncCallback<TableEntry<V>> onElement;
    private final AsyncCallback<BoxedUnit> onFinish;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public String scope() {
        return this.scope;
    }

    public Class<PravegaTableSourceStageLogic<K, V>> logSource() {
        return PravegaTableSourceStageLogic.class;
    }

    public Outlet<TableEntry<V>> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out() {
        return this.shape.out();
    }

    private KeyValueTableFactory keyValueTableFactory() {
        return this.keyValueTableFactory;
    }

    private void keyValueTableFactory_$eq(KeyValueTableFactory keyValueTableFactory) {
        this.keyValueTableFactory = keyValueTableFactory;
    }

    private KeyValueTable table() {
        return this.table;
    }

    private void table_$eq(KeyValueTable keyValueTable) {
        this.table = keyValueTable;
    }

    public Queue<TableEntry<V>> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue() {
        return this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue;
    }

    public Semaphore org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore() {
        return this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore;
    }

    public boolean org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing() {
        return this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing;
    }

    private void closing_$eq(boolean z) {
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing = z;
    }

    public AsyncCallback<String> logThat() {
        return this.logThat;
    }

    public void org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$pushElement(Outlet<TableEntry<V>> outlet, TableEntry<V> tableEntry) {
        push(outlet, tableEntry);
        org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore().release();
    }

    public AsyncCallback<TableEntry<V>> onElement() {
        return this.onElement;
    }

    public AsyncCallback<BoxedUnit> onFinish() {
        return this.onFinish;
    }

    public void nextIteration(final AsyncIterator<IteratorItem<io.pravega.client.tables.TableEntry>> asyncIterator) {
        asyncIterator.getNext().thenAccept((Consumer) new Consumer<IteratorItem<io.pravega.client.tables.TableEntry>>(this, asyncIterator) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaTableSourceStageLogic$$anon$2
            private final /* synthetic */ PravegaTableSourceStageLogic $outer;
            private final AsyncIterator iterator$1;

            @Override // java.util.function.Consumer
            public Consumer<IteratorItem<io.pravega.client.tables.TableEntry>> andThen(Consumer<? super IteratorItem<io.pravega.client.tables.TableEntry>> consumer) {
                return super.andThen(consumer);
            }

            @Override // java.util.function.Consumer
            public void accept(IteratorItem<io.pravega.client.tables.TableEntry> iteratorItem) {
                if (iteratorItem == null) {
                    this.$outer.onFinish().invoke(BoxedUnit.UNIT);
                } else {
                    iteratorItem.getItems().stream().forEach(tableEntry -> {
                        this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore().acquire();
                        this.$outer.onElement().invoke(new TableEntry(tableEntry.getKey(), tableEntry.getVersion(), this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.valueSerializer().deserialize(tableEntry.getValue())));
                    });
                    this.$outer.nextIteration(this.iterator$1);
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.iterator$1 = asyncIterator;
            }
        });
    }

    public void preStart() {
        log().debug("Start consuming {} by {} ...", this.tableName, BoxesRunTime.boxToInteger(this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.maximumInflightMessages()));
        try {
            KeyValueTableClientConfiguration build = KeyValueTableClientConfiguration.builder().build();
            keyValueTableFactory_$eq(KeyValueTableFactory.withScope(scope(), this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.clientConfig()));
            table_$eq(keyValueTableFactory().forKeyValueTable(this.tableName, build));
            nextIteration(table().iterator().maxIterationSize(this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.maxEntriesAtOnce()).all().entries());
            this.startupPromise.success(Done$.MODULE$);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    Throwable th2 = (Throwable) unapply.get();
                    log().error(th2.getMessage());
                    failStage(th2);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
            throw th;
        }
    }

    public void postStop() {
        log().debug("Stopping reader {}", this.tableName);
        table().close();
        keyValueTableFactory().close();
    }

    public static final /* synthetic */ void $anonfun$logThat$1(PravegaTableSourceStageLogic pravegaTableSourceStageLogic, String str) {
        pravegaTableSourceStageLogic.log().info(str);
    }

    public static final /* synthetic */ void $anonfun$onElement$1(PravegaTableSourceStageLogic pravegaTableSourceStageLogic, TableEntry tableEntry) {
        if (pravegaTableSourceStageLogic.isAvailable(pravegaTableSourceStageLogic.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out()) && pravegaTableSourceStageLogic.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue().isEmpty()) {
            pravegaTableSourceStageLogic.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$pushElement(pravegaTableSourceStageLogic.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out(), tableEntry);
        } else {
            pravegaTableSourceStageLogic.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue().enqueue(tableEntry);
        }
    }

    public static final /* synthetic */ void $anonfun$onFinish$1(PravegaTableSourceStageLogic pravegaTableSourceStageLogic, BoxedUnit boxedUnit) {
        pravegaTableSourceStageLogic.closing_$eq(true);
        if (pravegaTableSourceStageLogic.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue().isEmpty()) {
            pravegaTableSourceStageLogic.completeStage();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PravegaTableSourceStageLogic(SourceShape<TableEntry<V>> sourceShape, String str, String str2, TableReaderSettings<K, V> tableReaderSettings, Promise<Done> promise) {
        super(sourceShape);
        this.shape = sourceShape;
        this.scope = str;
        this.tableName = str2;
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings = tableReaderSettings;
        this.startupPromise = promise;
        StageLogging.$init$(this);
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue = Queue$.MODULE$.empty();
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$semaphore = new Semaphore(tableReaderSettings.maximumInflightMessages());
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing = false;
        this.logThat = getAsyncCallback(str3 -> {
            $anonfun$logThat$1(this, str3);
            return BoxedUnit.UNIT;
        });
        this.onElement = getAsyncCallback(tableEntry -> {
            $anonfun$onElement$1(this, tableEntry);
            return BoxedUnit.UNIT;
        });
        this.onFinish = getAsyncCallback(boxedUnit -> {
            $anonfun$onFinish$1(this, boxedUnit);
            return BoxedUnit.UNIT;
        });
        setHandler(org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out(), new OutHandler(this) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaTableSourceStageLogic$$anon$1
            private final /* synthetic */ PravegaTableSourceStageLogic $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                if (!this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue().isEmpty()) {
                    this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$pushElement(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$out(), (TableEntry) this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue().dequeue());
                }
                if (this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$closing() && this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableSourceStageLogic$$queue().isEmpty()) {
                    this.$outer.completeStage();
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
