package com.sksamuel.pulsar4s.akka.streams;

import akka.Done;
import akka.Done$;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import com.sksamuel.pulsar4s.AsyncHandler$;
import com.sksamuel.pulsar4s.Consumer;
import com.sksamuel.pulsar4s.ConsumerMessage;
import com.sksamuel.pulsar4s.MessageId;
import org.apache.pulsar.client.api.ConsumerStats;
import scala.MatchError;
import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: PulsarSourceGraphStage.scala */
/* loaded from: input_file:com/sksamuel/pulsar4s/akka/streams/PulsarSourceGraphStage$$anon$1.class */
public final class PulsarSourceGraphStage$$anon$1 extends GraphStageLogic implements OutHandler, Control {
    private Consumer<T> consumer;
    private AsyncCallback<ConsumerMessage<T>> callback;
    private final /* synthetic */ PulsarSourceGraphStage $outer;

    @Override // com.sksamuel.pulsar4s.akka.streams.Control
    public <S> Future<S> drainAndShutdown(Future<S> future, ExecutionContext executionContext) {
        Future<S> drainAndShutdown;
        drainAndShutdown = drainAndShutdown(future, executionContext);
        return drainAndShutdown;
    }

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    private Consumer<T> consumer() {
        return this.consumer;
    }

    private void consumer_$eq(Consumer<T> consumer) {
        this.consumer = consumer;
    }

    private AsyncCallback<ConsumerMessage<T>> callback() {
        return this.callback;
    }

    private void callback_$eq(AsyncCallback<ConsumerMessage<T>> asyncCallback) {
        this.callback = asyncCallback;
    }

    public void preStart() {
        consumer_$eq((Consumer) this.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarSourceGraphStage$$create.apply());
        Option<MessageId> option = this.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarSourceGraphStage$$seek;
        Consumer consumer = consumer();
        option.foreach(messageId -> {
            consumer.seek(messageId);
            return BoxedUnit.UNIT;
        });
        callback_$eq(getAsyncCallback(consumerMessage -> {
            $anonfun$preStart$2(this, consumerMessage);
            return BoxedUnit.UNIT;
        }));
    }

    public void onPull() {
        ExecutionContextExecutor executionContext = super.materializer().executionContext();
        this.$outer.logger().debug("Pull received; asking consumer for message");
        ((Future) consumer().receiveAsync(AsyncHandler$.MODULE$.handler(executionContext))).onComplete(r4 -> {
            $anonfun$onPull$1(this, r4);
            return BoxedUnit.UNIT;
        }, executionContext);
    }

    @Override // com.sksamuel.pulsar4s.akka.streams.Control
    public void stop() {
        completeStage();
    }

    @Override // com.sksamuel.pulsar4s.akka.streams.Control
    public Future<Done> shutdown(ExecutionContext executionContext) {
        completeStage();
        return ((Future) consumer().closeAsync(AsyncHandler$.MODULE$.handler(executionContext))).map(boxedUnit -> {
            return Done$.MODULE$;
        }, executionContext);
    }

    @Override // com.sksamuel.pulsar4s.akka.streams.Control
    public ConsumerStats stats() {
        return consumer().stats();
    }

    public static final /* synthetic */ void $anonfun$preStart$2(PulsarSourceGraphStage$$anon$1 pulsarSourceGraphStage$$anon$1, ConsumerMessage consumerMessage) {
        pulsarSourceGraphStage$$anon$1.push(pulsarSourceGraphStage$$anon$1.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarSourceGraphStage$$out(), consumerMessage);
    }

    public static final /* synthetic */ void $anonfun$onPull$1(PulsarSourceGraphStage$$anon$1 pulsarSourceGraphStage$$anon$1, Try r6) {
        if (r6 instanceof Success) {
            ConsumerMessage consumerMessage = (ConsumerMessage) ((Success) r6).value();
            pulsarSourceGraphStage$$anon$1.$outer.logger().debug(new StringBuilder(13).append("Msg received ").append(consumerMessage).toString());
            pulsarSourceGraphStage$$anon$1.callback().invoke(consumerMessage);
            pulsarSourceGraphStage$$anon$1.consumer().acknowledge(consumerMessage.messageId());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(r6 instanceof Failure)) {
            throw new MatchError(r6);
        }
        Throwable exception = ((Failure) r6).exception();
        pulsarSourceGraphStage$$anon$1.$outer.logger().warn("Error when receiving message", exception);
        pulsarSourceGraphStage$$anon$1.failStage(exception);
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PulsarSourceGraphStage$$anon$1(PulsarSourceGraphStage<T> pulsarSourceGraphStage) {
        super(pulsarSourceGraphStage.m2shape());
        if (pulsarSourceGraphStage == 0) {
            throw null;
        }
        this.$outer = pulsarSourceGraphStage;
        OutHandler.$init$(this);
        Control.$init$(this);
        setHandler(pulsarSourceGraphStage.com$sksamuel$pulsar4s$akka$streams$PulsarSourceGraphStage$$out(), this);
    }
}
