package org.apache.pekko.stream.connectors.pravega.impl;

import io.pravega.client.KeyValueTableFactory;
import io.pravega.client.tables.KeyValueTable;
import io.pravega.client.tables.KeyValueTableClientConfiguration;
import io.pravega.client.tables.Put;
import io.pravega.client.tables.TableKey;
import io.pravega.client.tables.Version;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.connectors.pravega.TableWriterSettings;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.util.FutureConverters$;
import org.apache.pekko.util.FutureConverters$CompletionStageOps$;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Tuple2;
import scala.Tuple2$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import scala.util.control.NonFatal$;

/* compiled from: PravegaTableWriteFlow.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/pravega/impl/PravegaTableWriteFlowStageLogic.class */
public final class PravegaTableWriteFlowStageLogic<KVPair, K, V> extends GraphStageLogic implements StageLogging {
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final FlowShape shape;
    public final Function1<KVPair, Tuple2<K, V>> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$kvpToTuple2;
    public final Function1<K, TableKey> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$extractor;
    private final String scope;
    private final String tableName;
    public final TableWriterSettings<K, V> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$tableWriterSettings;
    private KeyValueTableFactory keyValueTableFactory;
    public KeyValueTable org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$table;
    public final AtomicInteger org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir;
    public volatile boolean org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded;
    private final AsyncCallback<Tuple2<Try<Version>, KVPair>> asyncPushback;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PravegaTableWriteFlowStageLogic(FlowShape<KVPair, KVPair> flowShape, Function1<KVPair, Tuple2<K, V>> function1, Function1<K, TableKey> function12, String str, String str2, TableWriterSettings<K, V> tableWriterSettings) {
        super(flowShape);
        this.shape = flowShape;
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$kvpToTuple2 = function1;
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$extractor = function12;
        this.scope = str;
        this.tableName = str2;
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$tableWriterSettings = tableWriterSettings;
        StageLogging.$init$(this);
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir = new AtomicInteger();
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded = false;
        this.asyncPushback = getAsyncCallback(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Failure failure = (Try) tuple2._1();
            Object _2 = tuple2._2();
            if (failure instanceof Failure) {
                log().error(failure.exception(), "Failed to send message {}", _2);
            } else {
                push(out(), _2);
            }
            if (this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir.decrementAndGet() == 0 && this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded) {
                log().debug("Stage completed after upstream finish");
                completeStage();
            }
        });
        setHandler(org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$in(), new InHandler(this) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaTableWriteFlowStageLogic$$anon$1
            private final /* synthetic */ PravegaTableWriteFlowStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onUpstreamFailure(Throwable th) throws Exception {
                InHandler.onUpstreamFailure$(this, th);
            }

            public void onPush() {
                Object protected$grab = this.$outer.protected$grab(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$in());
                Tuple2 tuple22 = (Tuple2) this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$kvpToTuple2.apply(protected$grab);
                if (tuple22 == null) {
                    throw new MatchError(tuple22);
                }
                Tuple2 apply = Tuple2$.MODULE$.apply(tuple22._1(), tuple22._2());
                Put put = new Put((TableKey) this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$extractor.apply(apply._1()), this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$tableWriterSettings.valueSerializer().serialize(apply._2()));
                this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir.incrementAndGet();
                this.$outer.handleSentEvent(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$table.update(put), protected$grab);
            }

            public void onUpstreamFinish() {
                this.$outer.log().debug("Upstream finished");
                if (this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$onAir.get() == 0) {
                    this.$outer.log().debug("Stage completed on upstream finish");
                    this.$outer.completeStage();
                }
                this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$upstreamEnded = true;
            }
        });
        setHandler(out(), new OutHandler(this) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaTableWriteFlowStageLogic$$anon$2
            private final /* synthetic */ PravegaTableWriteFlowStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                this.$outer.protected$pull(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$in());
            }
        });
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public FlowShape<KVPair, KVPair> shape() {
        return this.shape;
    }

    public String scope() {
        return this.scope;
    }

    public Inlet<KVPair> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$in() {
        return shape().in();
    }

    private Outlet<KVPair> out() {
        return shape().out();
    }

    public void preStart() {
        try {
            KeyValueTableClientConfiguration build = KeyValueTableClientConfiguration.builder().build();
            this.keyValueTableFactory = KeyValueTableFactory.withScope(scope(), this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$tableWriterSettings.clientConfig());
            this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$table = this.keyValueTableFactory.forKeyValueTable(this.tableName, build);
            log().debug("Open table {}", this.tableName);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    failStage((Throwable) unapply.get());
                    return;
                }
            }
            throw th;
        }
    }

    public void handleSentEvent(CompletableFuture<Version> completableFuture, KVPair kvpair) {
        FutureConverters$CompletionStageOps$.MODULE$.asScala$extension(FutureConverters$.MODULE$.CompletionStageOps(completableFuture)).onComplete(r7 -> {
            return this.asyncPushback.invokeWithFeedback(Tuple2$.MODULE$.apply(r7, kvpair));
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public void postStop() {
        log().debug("Closing table {}", this.tableName);
        Failure apply = Try$.MODULE$.apply(() -> {
            postStop$$anonfun$1();
            return BoxedUnit.UNIT;
        });
        if (apply instanceof Failure) {
            log().error(apply.exception(), "Error while closing table [{}]", this.tableName);
        } else {
            if (!(apply instanceof Success)) {
                throw new MatchError(apply);
            }
            log().debug("Closed table [{}]", this.tableName);
        }
        this.keyValueTableFactory.close();
    }

    public <T> T protected$grab(Inlet<T> inlet) {
        return (T) grab(inlet);
    }

    public <T> void protected$pull(Inlet<T> inlet) {
        pull(inlet);
    }

    private final void postStop$$anonfun$1() {
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableWriteFlowStageLogic$$table.close();
    }
}
