package org.apache.pekko.stream.connectors.pravega.impl;

import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderGroup;
import java.util.UUID;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.connectors.pravega.PravegaEvent;
import org.apache.pekko.stream.connectors.pravega.ReaderSettings;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.MatchError;
import scala.Option;
import scala.concurrent.Promise;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.LazyVals;
import scala.runtime.LazyVals$;
import scala.runtime.LazyVals$Evaluating$;
import scala.runtime.LazyVals$NullValue$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;
import scala.util.control.NonFatal$;

/* compiled from: PravegaSource.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/pravega/impl/PravegaSourcesStageLogic.class */
public final class PravegaSourcesStageLogic<A> extends GraphStageLogic implements PravegaCapabilities, StageLogging {
    public static final long OFFSET$0 = LazyVals$.MODULE$.getOffsetStatic(PravegaSourcesStageLogic.class.getDeclaredField("eventStreamClientFactory$lzy1"));
    private volatile Object eventStreamClientFactory$lzy1;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final SourceShape<PravegaEvent<A>> shape;
    private final ReaderGroup readerGroup;
    private final ReaderSettings readerSettings;
    private final Promise<Done> startupPromise;
    private final String scope;
    public EventStreamReader<A> org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$reader;
    private final ClientConfig clientConfig;
    public final AsyncCallback<OutHandler> org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$asyncOnPull;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PravegaSourcesStageLogic(SourceShape<PravegaEvent<A>> sourceShape, ReaderGroup readerGroup, ReaderSettings<A> readerSettings, Promise<Done> promise) {
        super(sourceShape);
        this.shape = sourceShape;
        this.readerGroup = readerGroup;
        this.readerSettings = readerSettings;
        this.startupPromise = promise;
        PravegaCapabilities.$init$(this);
        StageLogging.$init$(this);
        this.scope = readerGroup.getScope();
        this.clientConfig = readerSettings.clientConfig();
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$asyncOnPull = getAsyncCallback(outHandler -> {
            outHandler.onPull();
        });
        setHandler(org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$out(), new OutHandler(this) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaSourcesStageLogic$$anon$1
            private final /* synthetic */ PravegaSourcesStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                EventRead readNextEvent;
                while (true) {
                    readNextEvent = this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$reader.readNextEvent(this.$outer.readerSettings().timeout());
                    if (!readNextEvent.isCheckpoint()) {
                        break;
                    } else {
                        this.$outer.log().debug("Checkpoint: {}", readNextEvent.getCheckpointName());
                    }
                }
                Object event = readNextEvent.getEvent();
                if (event != null) {
                    this.$outer.protected$push(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$out(), new PravegaEvent(event, readNextEvent.getPosition(), readNextEvent.getEventPointer()));
                } else {
                    this.$outer.log().debug("a timeout occurred while waiting for new messages");
                    this.$outer.protected$materializer().scheduleOnce(new package.DurationLong(package$.MODULE$.DurationLong(this.$outer.readerSettings().timeout())).millis(), () -> {
                        this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$asyncOnPull.invoke(this);
                    });
                }
            }
        });
    }

    @Override // org.apache.pekko.stream.connectors.pravega.impl.PravegaCapabilities
    public EventStreamClientFactory eventStreamClientFactory() {
        Object obj = this.eventStreamClientFactory$lzy1;
        if (obj instanceof EventStreamClientFactory) {
            return (EventStreamClientFactory) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (EventStreamClientFactory) eventStreamClientFactory$lzyINIT1();
    }

    private Object eventStreamClientFactory$lzyINIT1() {
        LazyVals$NullValue$ eventStreamClientFactory;
        while (true) {
            Object obj = this.eventStreamClientFactory$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$0, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        eventStreamClientFactory = eventStreamClientFactory();
                        if (eventStreamClientFactory == null) {
                            lazyVals$NullValue$ = LazyVals$NullValue$.MODULE$;
                        } else {
                            lazyVals$NullValue$ = eventStreamClientFactory;
                        }
                        return eventStreamClientFactory;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$0, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.eventStreamClientFactory$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$0, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$0, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    @Override // org.apache.pekko.stream.connectors.pravega.impl.PravegaCapabilities
    public /* bridge */ /* synthetic */ void close() {
        close();
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public ReaderSettings<A> readerSettings() {
        return this.readerSettings;
    }

    @Override // org.apache.pekko.stream.connectors.pravega.impl.PravegaCapabilities
    public String scope() {
        return this.scope;
    }

    public Class<?> logSource() {
        return PravegaSourcesStageLogic.class;
    }

    public Outlet<PravegaEvent<A>> org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$out() {
        return this.shape.out();
    }

    @Override // org.apache.pekko.stream.connectors.pravega.impl.PravegaCapabilities
    public ClientConfig clientConfig() {
        return this.clientConfig;
    }

    public void preStart() {
        log().debug("Start consuming {}...", this.readerGroup.toString());
        try {
            this.org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$reader = createReader(readerSettings(), this.readerGroup);
            this.startupPromise.success(Done$.MODULE$);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    Throwable th2 = (Throwable) unapply.get();
                    log().error(th2.getMessage());
                    failStage(th2);
                    return;
                }
            }
            throw th;
        }
    }

    private EventStreamReader<A> createReader(ReaderSettings<A> readerSettings, ReaderGroup readerGroup) {
        return eventStreamClientFactory().createReader((String) readerSettings.readerId().getOrElse(PravegaSourcesStageLogic::createReader$$anonfun$1), readerGroup.getGroupName(), readerSettings.serializer(), readerSettings.readerConfig());
    }

    public void postStop() {
        log().debug("Stopping reader");
        Failure apply = Try$.MODULE$.apply(() -> {
            postStop$$anonfun$1();
            return BoxedUnit.UNIT;
        });
        if (apply instanceof Failure) {
            log().error(apply.exception(), "Error while closing [{}/{}]", scope(), this.readerGroup.toString());
        } else {
            if (!(apply instanceof Success)) {
                throw new MatchError(apply);
            }
            log().debug("Closed reader [{}/{}]", scope(), this.readerGroup.getGroupName());
        }
        Failure apply2 = Try$.MODULE$.apply(() -> {
            postStop$$anonfun$2();
            return BoxedUnit.UNIT;
        });
        if (apply2 instanceof Failure) {
            log().error(apply2.exception(), "Error while closing reader group [{}/{}]", scope(), this.readerGroup.getGroupName());
        } else {
            if (!(apply2 instanceof Success)) {
                throw new MatchError(apply2);
            }
            log().debug("Closed reader group [{}/{}]", scope(), this.readerGroup.getGroupName());
        }
        close();
    }

    public Materializer protected$materializer() {
        return materializer();
    }

    public <T> void protected$push(Outlet<T> outlet, T t) {
        push(outlet, t);
    }

    private static final String createReader$$anonfun$1() {
        return UUID.randomUUID().toString();
    }

    private final void postStop$$anonfun$1() {
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaSourcesStageLogic$$reader.close();
    }

    private final void postStop$$anonfun$2() {
        this.readerGroup.close();
    }
}
