package akka.stream.alpakka.pravega.impl;

import akka.Done;
import akka.Done$;
import akka.annotation.InternalApi;
import akka.event.LoggingAdapter;
import akka.stream.Outlet;
import akka.stream.SourceShape;
import akka.stream.alpakka.pravega.TableEntry;
import akka.stream.alpakka.pravega.TableReaderSettings;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import io.pravega.client.KeyValueTableFactory;
import io.pravega.client.tables.IteratorItem;
import io.pravega.client.tables.KeyValueTable;
import io.pravega.client.tables.KeyValueTableClientConfiguration;
import io.pravega.common.util.AsyncIterator;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import scala.Option;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: PravegaTableSource.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u0015h\u0001B\u0011#\r5B\u0001\u0002\u000f\u0001\u0003\u0002\u0003\u0006I!\u000f\u0005\t\u001f\u0002\u0011)\u0019!C\u0001!\"AA\f\u0001B\u0001B\u0003%\u0011\u000b\u0003\u0005^\u0001\t\u0005\t\u0015!\u0003R\u0011!q\u0006A!A!\u0002\u0013y\u0006\u0002C3\u0001\u0005\u0003\u0005\u000b\u0011\u00024\t\u000bA\u0004A\u0011A9\t\u000be\u0004A\u0011\u000b>\t\u000f\u0005\u001d\u0001\u0001\"\u0003\u0002\n!Y\u0011\u0011\u0003\u0001A\u0002\u0003\u0007I\u0011BA\n\u0011-\t9\u0003\u0001a\u0001\u0002\u0004%I!!\u000b\t\u0017\u0005U\u0002\u00011A\u0001B\u0003&\u0011Q\u0003\u0005\f\u0003o\u0001\u0001\u0019!a\u0001\n\u0013\tI\u0004C\u0006\u0002H\u0001\u0001\r\u00111A\u0005\n\u0005%\u0003bCA'\u0001\u0001\u0007\t\u0011)Q\u0005\u0003wA\u0011\"a\u0014\u0001\u0005\u0004%I!!\u0015\t\u0011\u0005\r\u0004\u0001)A\u0005\u0003'B\u0011\"!\u001a\u0001\u0005\u0004%I!a\u001a\t\u0011\u0005]\u0004\u0001)A\u0005\u0003SB\u0011\"!\u001f\u0001\u0001\u0004%I!a\u001f\t\u0013\u0005\r\u0005\u00011A\u0005\n\u0005\u0015\u0005\u0002CAE\u0001\u0001\u0006K!! \t\u0013\u0005-\u0005A1A\u0005\u0002\u00055\u0005\u0002CAK\u0001\u0001\u0006I!a$\t\u000f\u0005]\u0005\u0001\"\u0003\u0002\u001a\"I\u0011\u0011\u0015\u0001C\u0002\u0013\u0005\u00111\u0015\u0005\t\u0003O\u0003\u0001\u0015!\u0003\u0002&\"I\u0011\u0011\u0016\u0001C\u0002\u0013\u0005\u00111\u0016\u0005\t\u0003_\u0003\u0001\u0015!\u0003\u0002.\"9\u0011\u0011\u0017\u0001\u0005\u0002\u0005M\u0006bBAi\u0001\u0011\u0005\u00131\u001b\u0005\b\u0003+\u0004A\u0011IAj\u0005q\u0001&/\u0019<fO\u0006$\u0016M\u00197f'>,(oY3Ti\u0006<W\rT8hS\u000eT!a\t\u0013\u0002\t%l\u0007\u000f\u001c\u0006\u0003K\u0019\nq\u0001\u001d:bm\u0016<\u0017M\u0003\u0002(Q\u00059\u0011\r\u001c9bW.\f'BA\u0015+\u0003\u0019\u0019HO]3b[*\t1&\u0001\u0003bW.\f7\u0001A\u000b\u0004]\r\u001c5c\u0001\u00010kA\u0011\u0001gM\u0007\u0002c)\u0011!\u0007K\u0001\u0006gR\fw-Z\u0005\u0003iE\u0012qb\u0012:ba\"\u001cF/Y4f\u0019><\u0017n\u0019\t\u0003aYJ!aN\u0019\u0003\u0019M#\u0018mZ3M_\u001e<\u0017N\\4\u0002\u000bMD\u0017\r]3\u0011\u0007iZT(D\u0001)\u0013\ta\u0004FA\u0006T_V\u00148-Z*iCB,\u0007c\u0001 @\u00036\tA%\u0003\u0002AI\tQA+\u00192mK\u0016sGO]=\u0011\u0005\t\u001bE\u0002\u0001\u0003\u0006\t\u0002\u0011\r!\u0012\u0002\u0002-F\u0011a\t\u0014\t\u0003\u000f*k\u0011\u0001\u0013\u0006\u0002\u0013\u0006)1oY1mC&\u00111\n\u0013\u0002\b\u001d>$\b.\u001b8h!\t9U*\u0003\u0002O\u0011\n\u0019\u0011I\\=\u0002\u000bM\u001cw\u000e]3\u0016\u0003E\u0003\"AU-\u000f\u0005M;\u0006C\u0001+I\u001b\u0005)&B\u0001,-\u0003\u0019a$o\\8u}%\u0011\u0001\fS\u0001\u0007!J,G-\u001a4\n\u0005i[&AB*ue&twM\u0003\u0002Y\u0011\u000611oY8qK\u0002\n\u0011\u0002^1cY\u0016t\u0015-\\3\u0002'Q\f'\r\\3SK\u0006$WM]*fiRLgnZ:\u0011\ty\u0002'-Q\u0005\u0003C\u0012\u00121\u0003V1cY\u0016\u0014V-\u00193feN+G\u000f^5oON\u0004\"AQ2\u0005\u000b\u0011\u0004!\u0019A#\u0003\u0003-\u000bab\u001d;beR,\b\u000f\u0015:p[&\u001cX\rE\u0002hU2l\u0011\u0001\u001b\u0006\u0003S\"\u000b!bY8oGV\u0014(/\u001a8u\u0013\tY\u0007NA\u0004Qe>l\u0017n]3\u0011\u00055tW\"\u0001\u0016\n\u0005=T#\u0001\u0002#p]\u0016\fa\u0001P5oSRtDC\u0002:ukZ<\b\u0010\u0005\u0003t\u0001\t\fU\"\u0001\u0012\t\u000ba:\u0001\u0019A\u001d\t\u000b=;\u0001\u0019A)\t\u000bu;\u0001\u0019A)\t\u000by;\u0001\u0019A0\t\u000b\u0015<\u0001\u0019\u00014\u0002\u00131|wmU8ve\u000e,W#A>\u0011\tq\f\u0019A]\u0007\u0002{*\u0011ap`\u0001\u0005Y\u0006twM\u0003\u0002\u0002\u0002\u0005!!.\u0019<b\u0013\r\t)! \u0002\u0006\u00072\f7o]\u0001\u0004_V$XCAA\u0006!\u0011Q\u0014QB\u001f\n\u0007\u0005=\u0001F\u0001\u0004PkRdW\r^\u0001\u0015W\u0016Lh+\u00197vKR\u000b'\r\\3GC\u000e$xN]=\u0016\u0005\u0005U\u0001\u0003BA\f\u0003Gi!!!\u0007\u000b\t\u0005m\u0011QD\u0001\u0007G2LWM\u001c;\u000b\u0007\u0015\nyB\u0003\u0002\u0002\"\u0005\u0011\u0011n\\\u0005\u0005\u0003K\tIB\u0001\u000bLKf4\u0016\r\\;f)\u0006\u0014G.\u001a$bGR|'/_\u0001\u0019W\u0016Lh+\u00197vKR\u000b'\r\\3GC\u000e$xN]=`I\u0015\fH\u0003BA\u0016\u0003c\u00012aRA\u0017\u0013\r\ty\u0003\u0013\u0002\u0005+:LG\u000fC\u0005\u00024-\t\t\u00111\u0001\u0002\u0016\u0005\u0019\u0001\u0010J\u0019\u0002+-,\u0017PV1mk\u0016$\u0016M\u00197f\r\u0006\u001cGo\u001c:zA\u0005)A/\u00192mKV\u0011\u00111\b\t\u0005\u0003{\t\u0019%\u0004\u0002\u0002@)!\u0011\u0011IA\r\u0003\u0019!\u0018M\u00197fg&!\u0011QIA \u00055YU-\u001f,bYV,G+\u00192mK\u0006IA/\u00192mK~#S-\u001d\u000b\u0005\u0003W\tY\u0005C\u0005\u000249\t\t\u00111\u0001\u0002<\u00051A/\u00192mK\u0002\nQ!];fk\u0016,\"!a\u0015\u0011\u000b\u0005U\u0013qL\u001f\u000e\u0005\u0005]#\u0002BA-\u00037\nq!\\;uC\ndWMC\u0002\u0002^!\u000b!bY8mY\u0016\u001cG/[8o\u0013\u0011\t\t'a\u0016\u0003\u000bE+X-^3\u0002\rE,X-^3!\u0003%\u0019X-\\1qQ>\u0014X-\u0006\u0002\u0002jA!\u00111NA:\u001b\t\tiGC\u0002j\u0003_R1!!\u001d��\u0003\u0011)H/\u001b7\n\t\u0005U\u0014Q\u000e\u0002\n'\u0016l\u0017\r\u001d5pe\u0016\f!b]3nCBDwN]3!\u0003\u001d\u0019Gn\\:j]\u001e,\"!! \u0011\u0007\u001d\u000by(C\u0002\u0002\u0002\"\u0013qAQ8pY\u0016\fg.A\u0006dY>\u001c\u0018N\\4`I\u0015\fH\u0003BA\u0016\u0003\u000fC\u0011\"a\r\u0016\u0003\u0003\u0005\r!! \u0002\u0011\rdwn]5oO\u0002\nq\u0001\\8h)\"\fG/\u0006\u0002\u0002\u0010B!\u0001'!%R\u0013\r\t\u0019*\r\u0002\u000e\u0003NLhnY\"bY2\u0014\u0017mY6\u0002\u00111|w\r\u00165bi\u0002\n1\u0002];tQ\u0016cW-\\3oiR1\u00111FAN\u0003;Cq!a\u0002\u001a\u0001\u0004\tY\u0001\u0003\u0004\u0002 f\u0001\r!P\u0001\bK2,W.\u001a8u\u0003%yg.\u00127f[\u0016tG/\u0006\u0002\u0002&B!\u0001'!%>\u0003)yg.\u00127f[\u0016tG\u000fI\u0001\t_:4\u0015N\\5tQV\u0011\u0011Q\u0016\t\u0006a\u0005E\u00151F\u0001\n_:4\u0015N\\5tQ\u0002\nQB\\3yi&#XM]1uS>tG\u0003BA\u0016\u0003kCq!a.\u001f\u0001\u0004\tI,\u0001\u0005ji\u0016\u0014\u0018\r^8s!\u0019\tY,a1\u0002H6\u0011\u0011Q\u0018\u0006\u0005\u0003c\nyL\u0003\u0003\u0002B\u0006u\u0011AB2p[6|g.\u0003\u0003\u0002F\u0006u&!D!ts:\u001c\u0017\n^3sCR|'\u000f\u0005\u0004\u0002>\u0005%\u0017QZ\u0005\u0005\u0003\u0017\fyD\u0001\u0007Ji\u0016\u0014\u0018\r^8s\u0013R,W\u000e\u0005\u0003\u0002>\u0005=\u0017b\u0001!\u0002@\u0005A\u0001O]3Ti\u0006\u0014H\u000f\u0006\u0002\u0002,\u0005A\u0001o\\:u'R|\u0007\u000fK\u0002\u0001\u00033\u0004B!a7\u0002b6\u0011\u0011Q\u001c\u0006\u0004\u0003?T\u0013AC1o]>$\u0018\r^5p]&!\u00111]Ao\u0005-Ie\u000e^3s]\u0006d\u0017\t]5")
@InternalApi
/* loaded from: input_file:akka/stream/alpakka/pravega/impl/PravegaTableSourceStageLogic.class */
public final class PravegaTableSourceStageLogic<K, V> extends GraphStageLogic implements StageLogging {
    private final SourceShape<TableEntry<V>> shape;
    private final String scope;
    private final String tableName;
    public final TableReaderSettings<K, V> akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings;
    private final Promise<Done> startupPromise;
    private KeyValueTableFactory keyValueTableFactory;
    private KeyValueTable table;
    private final Queue<TableEntry<V>> akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue;
    private final Semaphore akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$semaphore;
    private boolean akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$closing;
    private final AsyncCallback<String> logThat;
    private final AsyncCallback<TableEntry<V>> onElement;
    private final AsyncCallback<BoxedUnit> onFinish;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public String scope() {
        return this.scope;
    }

    public Class<PravegaTableSourceStageLogic<K, V>> logSource() {
        return PravegaTableSourceStageLogic.class;
    }

    public Outlet<TableEntry<V>> akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$out() {
        return this.shape.out();
    }

    private KeyValueTableFactory keyValueTableFactory() {
        return this.keyValueTableFactory;
    }

    private void keyValueTableFactory_$eq(KeyValueTableFactory keyValueTableFactory) {
        this.keyValueTableFactory = keyValueTableFactory;
    }

    private KeyValueTable table() {
        return this.table;
    }

    private void table_$eq(KeyValueTable keyValueTable) {
        this.table = keyValueTable;
    }

    public Queue<TableEntry<V>> akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue;
    }

    public Semaphore akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$semaphore() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$semaphore;
    }

    public boolean akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$closing() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$closing;
    }

    private void closing_$eq(boolean z) {
        this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$closing = z;
    }

    public AsyncCallback<String> logThat() {
        return this.logThat;
    }

    public void akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$pushElement(Outlet<TableEntry<V>> outlet, TableEntry<V> tableEntry) {
        push(outlet, tableEntry);
        akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$semaphore().release();
    }

    public AsyncCallback<TableEntry<V>> onElement() {
        return this.onElement;
    }

    public AsyncCallback<BoxedUnit> onFinish() {
        return this.onFinish;
    }

    public void nextIteration(final AsyncIterator<IteratorItem<io.pravega.client.tables.TableEntry>> asyncIterator) {
        asyncIterator.getNext().thenAccept((Consumer) new Consumer<IteratorItem<io.pravega.client.tables.TableEntry>>(this, asyncIterator) { // from class: akka.stream.alpakka.pravega.impl.PravegaTableSourceStageLogic$$anon$2
            private final /* synthetic */ PravegaTableSourceStageLogic $outer;
            private final AsyncIterator iterator$1;

            @Override // java.util.function.Consumer
            public Consumer<IteratorItem<io.pravega.client.tables.TableEntry>> andThen(Consumer<? super IteratorItem<io.pravega.client.tables.TableEntry>> consumer) {
                return super.andThen(consumer);
            }

            @Override // java.util.function.Consumer
            public void accept(IteratorItem<io.pravega.client.tables.TableEntry> iteratorItem) {
                if (iteratorItem == null) {
                    this.$outer.onFinish().invoke(BoxedUnit.UNIT);
                } else {
                    iteratorItem.getItems().stream().forEach(tableEntry -> {
                        this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$semaphore().acquire();
                        this.$outer.onElement().invoke(new TableEntry(tableEntry.getKey(), tableEntry.getVersion(), this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.valueSerializer().deserialize(tableEntry.getValue())));
                    });
                    this.$outer.nextIteration(this.iterator$1);
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.iterator$1 = asyncIterator;
            }
        });
    }

    public void preStart() {
        log().debug("Start consuming {} by {} ...", this.tableName, BoxesRunTime.boxToInteger(this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.maximumInflightMessages()));
        try {
            KeyValueTableClientConfiguration build = KeyValueTableClientConfiguration.builder().build();
            keyValueTableFactory_$eq(KeyValueTableFactory.withScope(scope(), this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.clientConfig()));
            table_$eq(keyValueTableFactory().forKeyValueTable(this.tableName, build));
            nextIteration(table().iterator().maxIterationSize(this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings.maxEntriesAtOnce()).all().entries());
            this.startupPromise.success(Done$.MODULE$);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    Throwable th2 = (Throwable) unapply.get();
                    log().error(th2.getMessage());
                    failStage(th2);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
            throw th;
        }
    }

    public void postStop() {
        log().debug("Stopping reader {}", this.tableName);
        table().close();
        keyValueTableFactory().close();
    }

    public static final /* synthetic */ void $anonfun$logThat$1(PravegaTableSourceStageLogic pravegaTableSourceStageLogic, String str) {
        pravegaTableSourceStageLogic.log().info(str);
    }

    public static final /* synthetic */ void $anonfun$onElement$1(PravegaTableSourceStageLogic pravegaTableSourceStageLogic, TableEntry tableEntry) {
        if (pravegaTableSourceStageLogic.isAvailable(pravegaTableSourceStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$out()) && pravegaTableSourceStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue().isEmpty()) {
            pravegaTableSourceStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$pushElement(pravegaTableSourceStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$out(), tableEntry);
        } else {
            pravegaTableSourceStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue().enqueue(tableEntry);
        }
    }

    public static final /* synthetic */ void $anonfun$onFinish$1(PravegaTableSourceStageLogic pravegaTableSourceStageLogic, BoxedUnit boxedUnit) {
        pravegaTableSourceStageLogic.closing_$eq(true);
        if (pravegaTableSourceStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue().isEmpty()) {
            pravegaTableSourceStageLogic.completeStage();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PravegaTableSourceStageLogic(SourceShape<TableEntry<V>> sourceShape, String str, String str2, TableReaderSettings<K, V> tableReaderSettings, Promise<Done> promise) {
        super(sourceShape);
        this.shape = sourceShape;
        this.scope = str;
        this.tableName = str2;
        this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$tableReaderSettings = tableReaderSettings;
        this.startupPromise = promise;
        StageLogging.$init$(this);
        this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue = Queue$.MODULE$.empty();
        this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$semaphore = new Semaphore(tableReaderSettings.maximumInflightMessages());
        this.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$closing = false;
        this.logThat = getAsyncCallback(str3 -> {
            $anonfun$logThat$1(this, str3);
            return BoxedUnit.UNIT;
        });
        this.onElement = getAsyncCallback(tableEntry -> {
            $anonfun$onElement$1(this, tableEntry);
            return BoxedUnit.UNIT;
        });
        this.onFinish = getAsyncCallback(boxedUnit -> {
            $anonfun$onFinish$1(this, boxedUnit);
            return BoxedUnit.UNIT;
        });
        setHandler(akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$out(), new OutHandler(this) { // from class: akka.stream.alpakka.pravega.impl.PravegaTableSourceStageLogic$$anon$1
            private final /* synthetic */ PravegaTableSourceStageLogic $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                if (!this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue().isEmpty()) {
                    this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$pushElement(this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$out(), (TableEntry) this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue().dequeue());
                }
                if (this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$closing() && this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableSourceStageLogic$$queue().isEmpty()) {
                    this.$outer.completeStage();
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
