package akka.stream.alpakka.pravega.impl;

import akka.annotation.InternalApi;
import akka.event.LoggingAdapter;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.alpakka.pravega.TableSettings;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import io.pravega.client.KeyValueTableFactory;
import io.pravega.client.tables.KeyValueTable;
import io.pravega.client.tables.KeyValueTableClientConfiguration;
import io.pravega.client.tables.TableEntry;
import io.pravega.client.tables.TableKey;
import java.util.concurrent.CompletableFuture;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.compat.java8.FutureConverters$;
import scala.compat.java8.FutureConverters$CompletionStageOps$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: PravegaTableReadFlow.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005}e\u0001B\u000e\u001d\r\u001dB\u0001B\r\u0001\u0003\u0006\u0004%\ta\r\u0005\t\u0019\u0002\u0011\t\u0011)A\u0005i!AQ\n\u0001BC\u0002\u0013\u0005a\n\u0003\u0005[\u0001\t\u0005\t\u0015!\u0003P\u0011!Y\u0006A!A!\u0002\u0013y\u0005\u0002\u0003/\u0001\u0005\u0003\u0005\u000b\u0011B/\t\u000b\u0005\u0004A\u0011\u00012\t\u000b%\u0004A\u0011\u00026\t\u000b9\u0004A\u0011B8\t\u0013M\u0004\u0001\u0019!a\u0001\n\u0013!\b\"\u0003@\u0001\u0001\u0004\u0005\r\u0011\"\u0003��\u0011)\tY\u0001\u0001a\u0001\u0002\u0003\u0006K!\u001e\u0005\f\u0003\u001b\u0001\u0001\u0019!a\u0001\n\u0013\ty\u0001C\u0006\u0002\u001e\u0001\u0001\r\u00111A\u0005\n\u0005}\u0001bCA\u0012\u0001\u0001\u0007\t\u0011)Q\u0005\u0003#A\u0011\"!\n\u0001\u0001\u0004%I!a\n\t\u0013\u0005=\u0002\u00011A\u0005\n\u0005E\u0002\u0002CA\u001b\u0001\u0001\u0006K!!\u000b\t\u0013\u0005}\u0002\u00011A\u0005\n\u0005\u0005\u0003\"CA%\u0001\u0001\u0007I\u0011BA&\u0011!\ty\u0005\u0001Q!\n\u0005\r\u0003\"CA*\u0001\t\u0007I\u0011BA+\u0011!\ty\u0007\u0001Q\u0001\n\u0005]\u0003bBA9\u0001\u0011\u0005\u00131\u000f\u0005\b\u0003k\u0002A\u0011AA<\u0011\u001d\ty\t\u0001C!\u0003g\u0012a\u0004\u0015:bm\u0016<\u0017\rV1cY\u0016\u0014V-\u00193GY><8\u000b^1hK2{w-[2\u000b\u0005uq\u0012\u0001B5na2T!a\b\u0011\u0002\u000fA\u0014\u0018M^3hC*\u0011\u0011EI\u0001\bC2\u0004\u0018m[6b\u0015\t\u0019C%\u0001\u0004tiJ,\u0017-\u001c\u0006\u0002K\u0005!\u0011m[6b\u0007\u0001)2\u0001\u000b\u001eK'\r\u0001\u0011f\f\t\u0003U5j\u0011a\u000b\u0006\u0003Y\t\nQa\u001d;bO\u0016L!AL\u0016\u0003\u001f\u001d\u0013\u0018\r\u001d5Ti\u0006<W\rT8hS\u000e\u0004\"A\u000b\u0019\n\u0005EZ#\u0001D*uC\u001e,Gj\\4hS:<\u0017!B:iCB,W#\u0001\u001b\u0011\tU2\u0004HR\u0007\u0002E%\u0011qG\t\u0002\n\r2|wo\u00155ba\u0016\u0004\"!\u000f\u001e\r\u0001\u0011)1\b\u0001b\u0001y\t\t1*\u0005\u0002>\u0007B\u0011a(Q\u0007\u0002\u007f)\t\u0001)A\u0003tG\u0006d\u0017-\u0003\u0002C\u007f\t9aj\u001c;iS:<\u0007C\u0001 E\u0013\t)uHA\u0002B]f\u00042AP$J\u0013\tAuH\u0001\u0004PaRLwN\u001c\t\u0003s)#Qa\u0013\u0001C\u0002q\u0012\u0011AV\u0001\u0007g\"\f\u0007/\u001a\u0011\u0002\u000bM\u001cw\u000e]3\u0016\u0003=\u0003\"\u0001U,\u000f\u0005E+\u0006C\u0001*@\u001b\u0005\u0019&B\u0001+'\u0003\u0019a$o\\8u}%\u0011akP\u0001\u0007!J,G-\u001a4\n\u0005aK&AB*ue&twM\u0003\u0002W\u007f\u000511oY8qK\u0002\n\u0011\u0002^1cY\u0016t\u0015-\\3\u0002\u001bQ\f'\r\\3TKR$\u0018N\\4t!\u0011qv\fO%\u000e\u0003yI!\u0001\u0019\u0010\u0003\u001bQ\u000b'\r\\3TKR$\u0018N\\4t\u0003\u0019a\u0014N\\5u}Q)1-\u001a4hQB!A\r\u0001\u001dJ\u001b\u0005a\u0002\"\u0002\u001a\b\u0001\u0004!\u0004\"B'\b\u0001\u0004y\u0005\"B.\b\u0001\u0004y\u0005\"\u0002/\b\u0001\u0004i\u0016AA5o+\u0005Y\u0007cA\u001bmq%\u0011QN\t\u0002\u0006\u0013:dW\r^\u0001\u0004_V$X#\u00019\u0011\u0007U\nh)\u0003\u0002sE\t1q*\u001e;mKR\fAc[3z-\u0006dW/\u001a+bE2,g)Y2u_JLX#A;\u0011\u0005YdX\"A<\u000b\u0005aL\u0018AB2mS\u0016tGO\u0003\u0002 u*\t10\u0001\u0002j_&\u0011Qp\u001e\u0002\u0015\u0017\u0016Lh+\u00197vKR\u000b'\r\\3GC\u000e$xN]=\u00021-,\u0017PV1mk\u0016$\u0016M\u00197f\r\u0006\u001cGo\u001c:z?\u0012*\u0017\u000f\u0006\u0003\u0002\u0002\u0005\u001d\u0001c\u0001 \u0002\u0004%\u0019\u0011QA \u0003\tUs\u0017\u000e\u001e\u0005\t\u0003\u0013Y\u0011\u0011!a\u0001k\u0006\u0019\u0001\u0010J\u0019\u0002+-,\u0017PV1mk\u0016$\u0016M\u00197f\r\u0006\u001cGo\u001c:zA\u0005)A/\u00192mKV\u0011\u0011\u0011\u0003\t\u0005\u0003'\tI\"\u0004\u0002\u0002\u0016)\u0019\u0011qC<\u0002\rQ\f'\r\\3t\u0013\u0011\tY\"!\u0006\u0003\u001b-+\u0017PV1mk\u0016$\u0016M\u00197f\u0003%!\u0018M\u00197f?\u0012*\u0017\u000f\u0006\u0003\u0002\u0002\u0005\u0005\u0002\"CA\u0005\u001d\u0005\u0005\t\u0019AA\t\u0003\u0019!\u0018M\u00197fA\u0005A\u0011N\u001c$mS\u001eDG/\u0006\u0002\u0002*A\u0019a(a\u000b\n\u0007\u00055rHA\u0002J]R\fA\"\u001b8GY&<\u0007\u000e^0%KF$B!!\u0001\u00024!I\u0011\u0011B\t\u0002\u0002\u0003\u0007\u0011\u0011F\u0001\nS:4E.[4ii\u0002B3AEA\u001d!\rq\u00141H\u0005\u0004\u0003{y$\u0001\u0003<pY\u0006$\u0018\u000e\\3\u0002\u001bU\u00048\u000f\u001e:fC6,e\u000eZ3e+\t\t\u0019\u0005E\u0002?\u0003\u000bJ1!a\u0012@\u0005\u001d\u0011un\u001c7fC:\f\u0011#\u001e9tiJ,\u0017-\\#oI\u0016$w\fJ3r)\u0011\t\t!!\u0014\t\u0013\u0005%A#!AA\u0002\u0005\r\u0013AD;qgR\u0014X-Y7F]\u0012,G\r\t\u0015\u0004+\u0005e\u0012\u0001G1ts:\u001cW*Z:tC\u001e,7+\u001a8e\u0007\u0006dGNY1dWV\u0011\u0011q\u000b\t\u0006U\u0005e\u0013QL\u0005\u0004\u00037Z#!D!ts:\u001c7)\u00197mE\u0006\u001c7\u000e\u0005\u0004\u0002`\u0005\u0015\u0014\u0011N\u0007\u0003\u0003CR1!a\u0019@\u0003\u0011)H/\u001b7\n\t\u0005\u001d\u0014\u0011\r\u0002\u0004)JL\b\u0003BA\n\u0003WJA!!\u001c\u0002\u0016\tQA+\u00192mK\u0016sGO]=\u00023\u0005\u001c\u0018P\\2NKN\u001c\u0018mZ3TK:$7)\u00197mE\u0006\u001c7\u000eI\u0001\taJ,7\u000b^1siR\u0011\u0011\u0011A\u0001\u0010Q\u0006tG\r\\3TK:$XI^3oiR!\u0011\u0011AA=\u0011\u001d\tY(\u0007a\u0001\u0003{\n\u0011cY8na2,G/\u00192mK\u001a+H/\u001e:f!\u0019\ty(a#\u0002j5\u0011\u0011\u0011\u0011\u0006\u0005\u0003\u0007\u000b))\u0001\u0006d_:\u001cWO\u001d:f]RTA!a\u0019\u0002\b*\u0011\u0011\u0011R\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002\u000e\u0006\u0005%!E\"p[BdW\r^1cY\u00164U\u000f^;sK\u0006A\u0001o\\:u'R|\u0007\u000fK\u0002\u0001\u0003'\u0003B!!&\u0002\u001c6\u0011\u0011q\u0013\u0006\u0004\u00033#\u0013AC1o]>$\u0018\r^5p]&!\u0011QTAL\u0005-Ie\u000e^3s]\u0006d\u0017\t]5")
@InternalApi
/* loaded from: input_file:akka/stream/alpakka/pravega/impl/PravegaTableReadFlowStageLogic.class */
public final class PravegaTableReadFlowStageLogic<K, V> extends GraphStageLogic implements StageLogging {
    private final FlowShape<K, Option<V>> shape;
    private final String scope;
    private final String tableName;
    public final TableSettings<K, V> akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings;
    private KeyValueTableFactory keyValueTableFactory;
    private KeyValueTable akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$table;
    private volatile int akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight;
    private volatile boolean akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded;
    private final AsyncCallback<Try<TableEntry>> asyncMessageSendCallback;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;

    public Class<?> logSource() {
        return StageLogging.logSource$(this);
    }

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public FlowShape<K, Option<V>> shape() {
        return this.shape;
    }

    public String scope() {
        return this.scope;
    }

    public Inlet<K> akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$in() {
        return shape().in();
    }

    private Outlet<Option<V>> out() {
        return shape().out();
    }

    private KeyValueTableFactory keyValueTableFactory() {
        return this.keyValueTableFactory;
    }

    private void keyValueTableFactory_$eq(KeyValueTableFactory keyValueTableFactory) {
        this.keyValueTableFactory = keyValueTableFactory;
    }

    public KeyValueTable akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$table() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$table;
    }

    private void table_$eq(KeyValueTable keyValueTable) {
        this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$table = keyValueTable;
    }

    public int akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight;
    }

    public void akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight_$eq(int i) {
        this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight = i;
    }

    private boolean upstreamEnded() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded;
    }

    public void akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded_$eq(boolean z) {
        this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded = z;
    }

    private AsyncCallback<Try<TableEntry>> asyncMessageSendCallback() {
        return this.asyncMessageSendCallback;
    }

    public void preStart() {
        try {
            KeyValueTableClientConfiguration build = KeyValueTableClientConfiguration.builder().build();
            keyValueTableFactory_$eq(KeyValueTableFactory.withScope(scope(), this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings.clientConfig()));
            table_$eq(keyValueTableFactory().forKeyValueTable(this.tableName, build));
            log().debug("Open table {}", this.tableName);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    failStage((Throwable) unapply.get());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
            throw th;
        }
    }

    public void handleSentEvent(CompletableFuture<TableEntry> completableFuture) {
        FutureConverters$CompletionStageOps$.MODULE$.toScala$extension(FutureConverters$.MODULE$.CompletionStageOps(completableFuture)).onComplete(r4 -> {
            return this.asyncMessageSendCallback().invokeWithFeedback(r4);
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public void postStop() {
        log().debug("Closing table {}", this.tableName);
        akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$table().close();
        keyValueTableFactory().close();
    }

    public static final /* synthetic */ void $anonfun$asyncMessageSendCallback$1(PravegaTableReadFlowStageLogic pravegaTableReadFlowStageLogic, Try r8) {
        if (r8 instanceof Failure) {
            pravegaTableReadFlowStageLogic.log().error(((Failure) r8).exception(), "Failed to send message {}");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r8 instanceof Success)) {
                throw new MatchError(r8);
            }
            TableEntry tableEntry = (TableEntry) ((Success) r8).value();
            if (tableEntry != null) {
                pravegaTableReadFlowStageLogic.push(pravegaTableReadFlowStageLogic.out(), new Some(pravegaTableReadFlowStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings.valueSerializer().deserialize(tableEntry.getValue())));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } else {
                pravegaTableReadFlowStageLogic.push(pravegaTableReadFlowStageLogic.out(), None$.MODULE$);
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            }
        }
        pravegaTableReadFlowStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight_$eq(pravegaTableReadFlowStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight() - 1);
        if (pravegaTableReadFlowStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight() == 0 && pravegaTableReadFlowStageLogic.upstreamEnded()) {
            pravegaTableReadFlowStageLogic.log().info("Stage completed after upstream finish");
            pravegaTableReadFlowStageLogic.completeStage();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PravegaTableReadFlowStageLogic(FlowShape<K, Option<V>> flowShape, String str, String str2, TableSettings<K, V> tableSettings) {
        super(flowShape);
        this.shape = flowShape;
        this.scope = str;
        this.tableName = str2;
        this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings = tableSettings;
        StageLogging.$init$(this);
        this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight = 0;
        this.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded = false;
        this.asyncMessageSendCallback = getAsyncCallback(r4 -> {
            $anonfun$asyncMessageSendCallback$1(this, r4);
            return BoxedUnit.UNIT;
        });
        setHandler(akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$in(), new InHandler(this) { // from class: akka.stream.alpakka.pravega.impl.PravegaTableReadFlowStageLogic$$anon$1
            private final /* synthetic */ PravegaTableReadFlowStageLogic $outer;

            public void onUpstreamFailure(Throwable th) throws Exception {
                InHandler.onUpstreamFailure$(this, th);
            }

            public void onPush() {
                Object grab = this.$outer.grab(this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$in());
                this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight_$eq(this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight() + 1);
                this.$outer.handleSentEvent(this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$table().get((TableKey) this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings.tableKey().apply(grab)));
            }

            public void onUpstreamFinish() {
                this.$outer.log().debug("Upstream finished");
                if (this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight() == 0) {
                    this.$outer.completeStage();
                }
                this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded_$eq(true);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                InHandler.$init$(this);
            }
        });
        setHandler(out(), new OutHandler(this) { // from class: akka.stream.alpakka.pravega.impl.PravegaTableReadFlowStageLogic$$anon$2
            private final /* synthetic */ PravegaTableReadFlowStageLogic $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                this.$outer.pull(this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableReadFlowStageLogic$$in());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
