package akka.stream.alpakka.pravega.impl;

import akka.annotation.InternalApi;
import akka.event.LoggingAdapter;
import akka.stream.FlowShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.alpakka.pravega.TableWriterSettings;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import io.pravega.client.KeyValueTableFactory;
import io.pravega.client.tables.KeyValueTable;
import io.pravega.client.tables.KeyValueTableClientConfiguration;
import io.pravega.client.tables.Put;
import io.pravega.client.tables.TableKey;
import io.pravega.client.tables.Version;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Tuple2;
import scala.compat.java8.FutureConverters$;
import scala.compat.java8.FutureConverters$CompletionStageOps$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import scala.util.control.NonFatal$;

/* compiled from: PravegaTableWriteFlow.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u0005g\u0001\u0002\u000f\u001e\r!B\u0001b\r\u0001\u0003\u0006\u0004%\t\u0001\u000e\u0005\t\u000f\u0002\u0011\t\u0011)A\u0005k!A\u0001\n\u0001B\u0001B\u0003%\u0011\n\u0003\u0005V\u0001\t\u0005\t\u0015!\u0003W\u0011!\u0011\u0007A!b\u0001\n\u0003\u0019\u0007\u0002C8\u0001\u0005\u0003\u0005\u000b\u0011\u00023\t\u0011A\u0004!\u0011!Q\u0001\n\u0011D\u0001\"\u001d\u0001\u0003\u0002\u0003\u0006IA\u001d\u0005\u0006m\u0002!\ta\u001e\u0005\b\u0003\u0003\u0001A\u0011BA\u0002\u0011\u001d\tY\u0001\u0001C\u0005\u0003\u001bA1\"!\u0006\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002\u0018!Y\u0011\u0011\u0005\u0001A\u0002\u0003\u0007I\u0011BA\u0012\u0011-\ty\u0003\u0001a\u0001\u0002\u0003\u0006K!!\u0007\t\u0017\u0005E\u0002\u00011AA\u0002\u0013%\u00111\u0007\u0005\f\u0003w\u0001\u0001\u0019!a\u0001\n\u0013\ti\u0004C\u0006\u0002B\u0001\u0001\r\u0011!Q!\n\u0005U\u0002\"CA\"\u0001\t\u0007I\u0011BA#\u0011!\ty\u0006\u0001Q\u0001\n\u0005\u001d\u0003\"CA1\u0001\u0001\u0007I\u0011BA2\u0011%\tY\u0007\u0001a\u0001\n\u0013\ti\u0007\u0003\u0005\u0002r\u0001\u0001\u000b\u0015BA3\u0011%\tY\b\u0001b\u0001\n\u0013\ti\b\u0003\u0005\u0002\u0018\u0002\u0001\u000b\u0011BA@\u0011\u001d\tI\n\u0001C!\u00037Cq!!(\u0001\t\u0003\ty\nC\u0004\u00022\u0002!\t%a'\u0003?A\u0013\u0018M^3hCR\u000b'\r\\3Xe&$XM\u00127poN#\u0018mZ3M_\u001eL7M\u0003\u0002\u001f?\u0005!\u0011.\u001c9m\u0015\t\u0001\u0013%A\u0004qe\u00064XmZ1\u000b\u0005\t\u001a\u0013aB1ma\u0006\\7.\u0019\u0006\u0003I\u0015\naa\u001d;sK\u0006l'\"\u0001\u0014\u0002\t\u0005\\7.Y\u0002\u0001+\u0011I3\bU*\u0014\u0007\u0001Q\u0003\u0007\u0005\u0002,]5\tAF\u0003\u0002.G\u0005)1\u000f^1hK&\u0011q\u0006\f\u0002\u0010\u000fJ\f\u0007\u000f[*uC\u001e,Gj\\4jGB\u00111&M\u0005\u0003e1\u0012Ab\u0015;bO\u0016dunZ4j]\u001e\fQa\u001d5ba\u0016,\u0012!\u000e\t\u0005m]J\u0014(D\u0001$\u0013\tA4EA\u0005GY><8\u000b[1qKB\u0011!h\u000f\u0007\u0001\t\u0015a\u0004A1\u0001>\u0005\u0019Ye\u000bU1jeF\u0011a\b\u0012\t\u0003\u007f\tk\u0011\u0001\u0011\u0006\u0002\u0003\u0006)1oY1mC&\u00111\t\u0011\u0002\b\u001d>$\b.\u001b8h!\tyT)\u0003\u0002G\u0001\n\u0019\u0011I\\=\u0002\rMD\u0017\r]3!\u0003-Yg\u000f\u001d+p)V\u0004H.\u001a\u001a\u0011\t}R\u0015\bT\u0005\u0003\u0017\u0002\u0013\u0011BR;oGRLwN\\\u0019\u0011\t}juJU\u0005\u0003\u001d\u0002\u0013a\u0001V;qY\u0016\u0014\u0004C\u0001\u001eQ\t\u0015\t\u0006A1\u0001>\u0005\u0005Y\u0005C\u0001\u001eT\t\u0015!\u0006A1\u0001>\u0005\u00051\u0016!C3yiJ\f7\r^8s!\u0011y$jT,\u0011\u0005a\u0003W\"A-\u000b\u0005i[\u0016A\u0002;bE2,7O\u0003\u0002];\u000611\r\\5f]RT!\u0001\t0\u000b\u0003}\u000b!![8\n\u0005\u0005L&\u0001\u0003+bE2,7*Z=\u0002\u000bM\u001cw\u000e]3\u0016\u0003\u0011\u0004\"!\u001a7\u000f\u0005\u0019T\u0007CA4A\u001b\u0005A'BA5(\u0003\u0019a$o\\8u}%\u00111\u000eQ\u0001\u0007!J,G-\u001a4\n\u00055t'AB*ue&twM\u0003\u0002l\u0001\u000611oY8qK\u0002\n\u0011\u0002^1cY\u0016t\u0015-\\3\u0002'Q\f'\r\\3Xe&$XM]*fiRLgnZ:\u0011\tM$xJU\u0007\u0002?%\u0011Qo\b\u0002\u0014)\u0006\u0014G.Z,sSR,'oU3ui&twm]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u000faT8\u0010`?\u007f\u007fB)\u0011\u0010A\u001dP%6\tQ\u0004C\u00034\u0013\u0001\u0007Q\u0007C\u0003I\u0013\u0001\u0007\u0011\nC\u0003V\u0013\u0001\u0007a\u000bC\u0003c\u0013\u0001\u0007A\rC\u0003q\u0013\u0001\u0007A\rC\u0003r\u0013\u0001\u0007!/\u0001\u0002j]V\u0011\u0011Q\u0001\t\u0005m\u0005\u001d\u0011(C\u0002\u0002\n\r\u0012Q!\u00138mKR\f1a\\;u+\t\ty\u0001\u0005\u00037\u0003#I\u0014bAA\nG\t1q*\u001e;mKR\fAc[3z-\u0006dW/\u001a+bE2,g)Y2u_JLXCAA\r!\u0011\tY\"!\b\u000e\u0003mK1!a\b\\\u0005QYU-\u001f,bYV,G+\u00192mK\u001a\u000b7\r^8ss\u0006A2.Z=WC2,X\rV1cY\u00164\u0015m\u0019;pef|F%Z9\u0015\t\u0005\u0015\u00121\u0006\t\u0004\u007f\u0005\u001d\u0012bAA\u0015\u0001\n!QK\\5u\u0011%\ti#DA\u0001\u0002\u0004\tI\"A\u0002yIE\nQc[3z-\u0006dW/\u001a+bE2,g)Y2u_JL\b%A\u0003uC\ndW-\u0006\u0002\u00026A\u0019\u0001,a\u000e\n\u0007\u0005e\u0012LA\u0007LKf4\u0016\r\\;f)\u0006\u0014G.Z\u0001\ni\u0006\u0014G.Z0%KF$B!!\n\u0002@!I\u0011Q\u0006\t\u0002\u0002\u0003\u0007\u0011QG\u0001\u0007i\u0006\u0014G.\u001a\u0011\u0002\u000b=t\u0017)\u001b:\u0016\u0005\u0005\u001d\u0003\u0003BA%\u00037j!!a\u0013\u000b\t\u00055\u0013qJ\u0001\u0007CR|W.[2\u000b\t\u0005E\u00131K\u0001\u000bG>t7-\u001e:sK:$(\u0002BA+\u0003/\nA!\u001e;jY*\u0011\u0011\u0011L\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002^\u0005-#!D!u_6L7-\u00138uK\u001e,'/\u0001\u0004p]\u0006K'\u000fI\u0001\u000ekB\u001cHO]3b[\u0016sG-\u001a3\u0016\u0005\u0005\u0015\u0004cA \u0002h%\u0019\u0011\u0011\u000e!\u0003\u000f\t{w\u000e\\3b]\u0006\tR\u000f]:ue\u0016\fW.\u00128eK\u0012|F%Z9\u0015\t\u0005\u0015\u0012q\u000e\u0005\n\u0003[)\u0012\u0011!a\u0001\u0003K\na\"\u001e9tiJ,\u0017-\\#oI\u0016$\u0007\u0005K\u0002\u0017\u0003k\u00022aPA<\u0013\r\tI\b\u0011\u0002\tm>d\u0017\r^5mK\u0006i\u0011m]=oGB+8\u000f\u001b2bG.,\"!a \u0011\u000b-\n\t)!\"\n\u0007\u0005\rEFA\u0007Bgft7mQ1mY\n\f7m\u001b\t\u0006\u007f5\u000b9)\u000f\t\u0007\u0003\u0013\u000bi)!%\u000e\u0005\u0005-%bAA+\u0001&!\u0011qRAF\u0005\r!&/\u001f\t\u00041\u0006M\u0015bAAK3\n9a+\u001a:tS>t\u0017AD1ts:\u001c\u0007+^:iE\u0006\u001c7\u000eI\u0001\taJ,7\u000b^1siR\u0011\u0011QE\u0001\u0010Q\u0006tG\r\\3TK:$XI^3oiR1\u0011QEAQ\u0003[Cq!a)\u001b\u0001\u0004\t)+A\td_6\u0004H.\u001a;bE2,g)\u001e;ve\u0016\u0004b!a*\u0002*\u0006EUBAA(\u0013\u0011\tY+a\u0014\u0003#\r{W\u000e\u001d7fi\u0006\u0014G.\u001a$viV\u0014X\r\u0003\u0004\u00020j\u0001\r!O\u0001\u0004[N<\u0017\u0001\u00039pgR\u001cFo\u001c9)\u0007\u0001\t)\f\u0005\u0003\u00028\u0006uVBAA]\u0015\r\tY,J\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA`\u0003s\u00131\"\u00138uKJt\u0017\r\\!qS\u0002")
@InternalApi
/* loaded from: input_file:akka/stream/alpakka/pravega/impl/PravegaTableWriteFlowStageLogic.class */
public final class PravegaTableWriteFlowStageLogic<KVPair, K, V> extends GraphStageLogic implements StageLogging {
    private final FlowShape<KVPair, KVPair> shape;
    public final Function1<KVPair, Tuple2<K, V>> akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$kvpToTuple2;
    public final Function1<K, TableKey> akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$extractor;
    private final String scope;
    private final String tableName;
    public final TableWriterSettings<K, V> akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$tableWriterSettings;
    private KeyValueTableFactory keyValueTableFactory;
    private KeyValueTable akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$table;
    private final AtomicInteger akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir;
    private volatile boolean akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded;
    private final AsyncCallback<Tuple2<Try<Version>, KVPair>> asyncPushback;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;

    public Class<?> logSource() {
        return StageLogging.logSource$(this);
    }

    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public FlowShape<KVPair, KVPair> shape() {
        return this.shape;
    }

    public String scope() {
        return this.scope;
    }

    public Inlet<KVPair> akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$in() {
        return shape().in();
    }

    private Outlet<KVPair> out() {
        return shape().out();
    }

    private KeyValueTableFactory keyValueTableFactory() {
        return this.keyValueTableFactory;
    }

    private void keyValueTableFactory_$eq(KeyValueTableFactory keyValueTableFactory) {
        this.keyValueTableFactory = keyValueTableFactory;
    }

    public KeyValueTable akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$table() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$table;
    }

    private void table_$eq(KeyValueTable keyValueTable) {
        this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$table = keyValueTable;
    }

    public AtomicInteger akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir;
    }

    private boolean upstreamEnded() {
        return this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded;
    }

    public void akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded_$eq(boolean z) {
        this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded = z;
    }

    private AsyncCallback<Tuple2<Try<Version>, KVPair>> asyncPushback() {
        return this.asyncPushback;
    }

    public void preStart() {
        try {
            KeyValueTableClientConfiguration build = KeyValueTableClientConfiguration.builder().build();
            keyValueTableFactory_$eq(KeyValueTableFactory.withScope(scope(), this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$tableWriterSettings.clientConfig()));
            table_$eq(keyValueTableFactory().forKeyValueTable(this.tableName, build));
            log().debug("Open table {}", this.tableName);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    failStage((Throwable) unapply.get());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
            throw th;
        }
    }

    public void handleSentEvent(CompletableFuture<Version> completableFuture, KVPair kvpair) {
        FutureConverters$CompletionStageOps$.MODULE$.toScala$extension(FutureConverters$.MODULE$.CompletionStageOps(completableFuture)).onComplete(r8 -> {
            return this.asyncPushback().invokeWithFeedback(new Tuple2(r8, kvpair));
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public void postStop() {
        log().debug("Closing table {}", this.tableName);
        Failure apply = Try$.MODULE$.apply(() -> {
            this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$table().close();
        });
        if (apply instanceof Failure) {
            log().error(apply.exception(), "Error while closing table [{}]", this.tableName);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(apply instanceof Success)) {
                throw new MatchError(apply);
            }
            log().debug("Closed table [{}]", this.tableName);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        keyValueTableFactory().close();
    }

    public static final /* synthetic */ void $anonfun$asyncPushback$1(PravegaTableWriteFlowStageLogic pravegaTableWriteFlowStageLogic, Tuple2 tuple2) {
        if (tuple2 != null) {
            Failure failure = (Try) tuple2._1();
            Object _2 = tuple2._2();
            if (failure instanceof Failure) {
                pravegaTableWriteFlowStageLogic.log().error(failure.exception(), "Failed to send message {}", _2);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                if (pravegaTableWriteFlowStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir().decrementAndGet() == 0 || !pravegaTableWriteFlowStageLogic.upstreamEnded()) {
                }
                pravegaTableWriteFlowStageLogic.log().debug("Stage completed after upstream finish");
                pravegaTableWriteFlowStageLogic.completeStage();
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        pravegaTableWriteFlowStageLogic.push(pravegaTableWriteFlowStageLogic.out(), tuple2._2());
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        if (pravegaTableWriteFlowStageLogic.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir().decrementAndGet() == 0) {
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PravegaTableWriteFlowStageLogic(FlowShape<KVPair, KVPair> flowShape, Function1<KVPair, Tuple2<K, V>> function1, Function1<K, TableKey> function12, String str, String str2, TableWriterSettings<K, V> tableWriterSettings) {
        super(flowShape);
        this.shape = flowShape;
        this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$kvpToTuple2 = function1;
        this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$extractor = function12;
        this.scope = str;
        this.tableName = str2;
        this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$tableWriterSettings = tableWriterSettings;
        StageLogging.$init$(this);
        this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir = new AtomicInteger();
        this.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded = false;
        this.asyncPushback = getAsyncCallback(tuple2 -> {
            $anonfun$asyncPushback$1(this, tuple2);
            return BoxedUnit.UNIT;
        });
        setHandler(akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$in(), new InHandler(this) { // from class: akka.stream.alpakka.pravega.impl.PravegaTableWriteFlowStageLogic$$anon$1
            private final /* synthetic */ PravegaTableWriteFlowStageLogic $outer;

            public void onUpstreamFailure(Throwable th) throws Exception {
                InHandler.onUpstreamFailure$(this, th);
            }

            public void onPush() {
                Object grab = this.$outer.grab(this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$in());
                Tuple2 tuple22 = (Tuple2) this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$kvpToTuple2.apply(grab);
                if (tuple22 == null) {
                    throw new MatchError(tuple22);
                }
                Tuple2 tuple23 = new Tuple2(tuple22._1(), tuple22._2());
                Put put = new Put((TableKey) this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$extractor.apply(tuple23._1()), this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$tableWriterSettings.valueSerializer().serialize(tuple23._2()));
                this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir().incrementAndGet();
                this.$outer.handleSentEvent(this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$table().update(put), grab);
            }

            public void onUpstreamFinish() {
                this.$outer.log().debug("Upstream finished");
                if (this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir().get() == 0) {
                    this.$outer.log().debug("Stage completed on upstream finish");
                    this.$outer.completeStage();
                }
                this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded_$eq(true);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                InHandler.$init$(this);
            }
        });
        setHandler(out(), new OutHandler(this) { // from class: akka.stream.alpakka.pravega.impl.PravegaTableWriteFlowStageLogic$$anon$2
            private final /* synthetic */ PravegaTableWriteFlowStageLogic $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                this.$outer.pull(this.$outer.akka$stream$alpakka$pravega$impl$PravegaTableWriteFlowStageLogic$$in());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
