package com.sksamuel.pulsar4s.akka.streams;

import akka.Done;
import akka.Done$;
import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.Outlet$;
import akka.stream.SourceShape;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.GraphStageWithMaterializedValue;
import akka.stream.stage.OutHandler;
import com.sksamuel.exts.Logging;
import com.sksamuel.pulsar4s.AsyncHandler$;
import com.sksamuel.pulsar4s.Consumer;
import com.sksamuel.pulsar4s.ConsumerMessage;
import com.sksamuel.pulsar4s.MessageId;
import com.sksamuel.pulsar4s.akka.streams.PulsarCommittableSourceGraphStage;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: PulsarCommittableSourceGraphStage.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055b\u0001B\n\u0015\u0001}A\u0001\"\u0013\u0001\u0003\u0002\u0003\u0006IA\u0013\u0005\t#\u0002\u0011\t\u0011)A\u0005%\")\u0001\f\u0001C\u00013\"9Q\f\u0001b\u0001\n\u0013q\u0006B\u00022\u0001A\u0003%q\fC\u0003d\u0001\u0011\u0005CM\u0002\u0003f\u0001\u00111\u0007\u0002C2\b\u0005\u0003\u0005\u000b\u0011B7\t\u000ba;A\u0011A:\t\u0013Y<\u0001\u0019!a\u0001\n\u00039\b\"\u0003=\b\u0001\u0004\u0005\r\u0011\"\u0001z\u0011%yx\u00011A\u0001B\u0003&Q\nC\u0006\u0002\u0002\u001d\u0001\r\u00111A\u0005\u0002\u0005\r\u0001bCA\u0006\u000f\u0001\u0007\t\u0019!C\u0001\u0003\u001bA1\"!\u0005\b\u0001\u0004\u0005\t\u0015)\u0003\u0002\u0006!9\u00111C\u0004\u0005B\u0005U\u0001bBA\f\u000f\u0011\u0005\u0013Q\u0003\u0005\b\u00033\u0001A\u0011IA\u000e\u0005\u0005\u0002V\u000f\\:be\u000e{W.\\5ui\u0006\u0014G.Z*pkJ\u001cWm\u0012:ba\"\u001cF/Y4f\u0015\t)b#A\u0004tiJ,\u0017-\\:\u000b\u0005]A\u0012\u0001B1lW\u0006T!!\u0007\u000e\u0002\u0011A,Hn]1siMT!a\u0007\u000f\u0002\u0011M\\7/Y7vK2T\u0011!H\u0001\u0004G>l7\u0001A\u000b\u0003AQ\u001a2\u0001A\u0011D!\u0011\u0011\u0003F\u000b!\u000e\u0003\rR!\u0001J\u0013\u0002\u000bM$\u0018mZ3\u000b\u0005\u0019:\u0013AB:ue\u0016\fWNC\u0001\u0018\u0013\tI3EA\u0010He\u0006\u0004\bn\u0015;bO\u0016<\u0016\u000e\u001e5NCR,'/[1mSj,GMV1mk\u0016\u00042a\u000b\u0017/\u001b\u0005)\u0013BA\u0017&\u0005-\u0019v.\u001e:dKNC\u0017\r]3\u0011\u0007=\u0002$'D\u0001\u0015\u0013\t\tDC\u0001\nD_6l\u0017\u000e\u001e;bE2,W*Z:tC\u001e,\u0007CA\u001a5\u0019\u0001!Q!\u000e\u0001C\u0002Y\u0012\u0011\u0001V\t\u0003ou\u0002\"\u0001O\u001e\u000e\u0003eR\u0011AO\u0001\u0006g\u000e\fG.Y\u0005\u0003ye\u0012qAT8uQ&tw\r\u0005\u00029}%\u0011q(\u000f\u0002\u0004\u0003:L\bCA\u0018B\u0013\t\u0011ECA\u0004D_:$(o\u001c7\u0011\u0005\u0011;U\"A#\u000b\u0005\u0019S\u0012\u0001B3yiNL!\u0001S#\u0003\u000f1{wmZ5oO\u000611M]3bi\u0016\u00042\u0001O&N\u0013\ta\u0015HA\u0005Gk:\u001cG/[8oaA\u0019aj\u0014\u001a\u000e\u0003aI!\u0001\u0015\r\u0003\u0011\r{gn];nKJ\fAa]3fWB\u0019\u0001hU+\n\u0005QK$AB(qi&|g\u000e\u0005\u0002O-&\u0011q\u000b\u0007\u0002\n\u001b\u0016\u001c8/Y4f\u0013\u0012\fa\u0001P5oSRtDc\u0001.\\9B\u0019q\u0006\u0001\u001a\t\u000b%\u001b\u0001\u0019\u0001&\t\u000bE\u001b\u0001\u0019\u0001*\u0002\u0007=,H/F\u0001`!\rY\u0003ML\u0005\u0003C\u0016\u0012aaT;uY\u0016$\u0018\u0001B8vi\u0002\nQa\u001d5ba\u0016,\u0012A\u000b\u0002\u001d!Vd7/\u0019:D_6l\u0017\u000e\u001e;bE2,7k\\;sG\u0016dunZ5d'\r9qM\u001b\t\u0003E!L!![\u0012\u0003\u001f\u001d\u0013\u0018\r\u001d5Ti\u0006<W\rT8hS\u000e\u0004\"AI6\n\u00051\u001c#AC(vi\"\u000bg\u000e\u001a7feB\u0011an\\\u0007\u0002\u0001%\u0011\u0001/\u001d\u0002\u0006'\"\f\u0007/Z\u0005\u0003e\u0016\u0012Qa\u0012:ba\"$\"\u0001^;\u0011\u00059<\u0001\"B2\n\u0001\u0004i\u0017\u0001C2p]N,X.\u001a:\u0016\u00035\u000bAbY8ogVlWM]0%KF$\"A_?\u0011\u0005aZ\u0018B\u0001?:\u0005\u0011)f.\u001b;\t\u000fy\\\u0011\u0011!a\u0001\u001b\u0006\u0019\u0001\u0010J\u0019\u0002\u0013\r|gn];nKJ\u0004\u0013a\u0004:fG\u0016Lg/Z\"bY2\u0014\u0017mY6\u0016\u0005\u0005\u0015\u0001\u0003\u0002\u0012\u0002\b9J1!!\u0003$\u00055\t5/\u001f8d\u0007\u0006dGNY1dW\u0006\u0019\"/Z2fSZ,7)\u00197mE\u0006\u001c7n\u0018\u0013fcR\u0019!0a\u0004\t\u0011yt\u0011\u0011!a\u0001\u0003\u000b\t\u0001C]3dK&4XmQ1mY\n\f7m\u001b\u0011\u0002\u0011A\u0014Xm\u0015;beR$\u0012A_\u0001\u0007_:\u0004V\u000f\u001c7\u0002?\r\u0014X-\u0019;f\u0019><\u0017nY!oI6\u000bG/\u001a:jC2L'0\u001a3WC2,X\r\u0006\u0003\u0002\u001e\u0005\r\u0002#\u0002\u001d\u0002 \u001d\u0004\u0015bAA\u0011s\t1A+\u001e9mKJBq!!\n\u0013\u0001\u0004\t9#A\nj]\",'/\u001b;fI\u0006#HO]5ckR,7\u000fE\u0002,\u0003SI1!a\u000b&\u0005)\tE\u000f\u001e:jEV$Xm\u001d")
/* loaded from: input_file:com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage.class */
public class PulsarCommittableSourceGraphStage<T> extends GraphStageWithMaterializedValue<SourceShape<CommittableMessage<T>>, Control> implements Logging {
    public final Function0<Consumer<T>> com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$create;
    public final Option<MessageId> com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$seek;
    private final Outlet<CommittableMessage<T>> com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$out;
    private final Logger logger;

    /* compiled from: PulsarCommittableSourceGraphStage.scala */
    /* loaded from: input_file:com/sksamuel/pulsar4s/akka/streams/PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic.class */
    public class PulsarCommittableSourceLogic extends GraphStageLogic implements OutHandler {
        private Consumer<T> consumer;
        private AsyncCallback<CommittableMessage<T>> receiveCallback;
        public final /* synthetic */ PulsarCommittableSourceGraphStage $outer;

        public void onDownstreamFinish() throws Exception {
            OutHandler.onDownstreamFinish$(this);
        }

        public Consumer<T> consumer() {
            return this.consumer;
        }

        public void consumer_$eq(Consumer<T> consumer) {
            this.consumer = consumer;
        }

        public AsyncCallback<CommittableMessage<T>> receiveCallback() {
            return this.receiveCallback;
        }

        public void receiveCallback_$eq(AsyncCallback<CommittableMessage<T>> asyncCallback) {
            this.receiveCallback = asyncCallback;
        }

        public void preStart() {
            super.materializer().executionContext();
            consumer_$eq((Consumer) com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$$outer().com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$create.apply());
            Option<MessageId> option = com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$$outer().com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$seek;
            Consumer<T> consumer = consumer();
            option.foreach(messageId -> {
                consumer.seek(messageId);
                return BoxedUnit.UNIT;
            });
            receiveCallback_$eq(getAsyncCallback(committableMessage -> {
                $anonfun$preStart$2(this, committableMessage);
                return BoxedUnit.UNIT;
            }));
        }

        public void onPull() {
            ExecutionContextExecutor executionContext = super.materializer().executionContext();
            com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$$outer().logger().debug("Pull received; asking consumer for message");
            ((Future) consumer().receiveAsync(AsyncHandler$.MODULE$.handler(executionContext))).onComplete(r6 -> {
                $anonfun$onPull$1(this, executionContext, r6);
                return BoxedUnit.UNIT;
            }, executionContext);
        }

        public /* synthetic */ PulsarCommittableSourceGraphStage com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ void $anonfun$preStart$2(PulsarCommittableSourceLogic pulsarCommittableSourceLogic, CommittableMessage committableMessage) {
            pulsarCommittableSourceLogic.push(pulsarCommittableSourceLogic.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$$outer().com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$out(), committableMessage);
        }

        public static final /* synthetic */ void $anonfun$onPull$1(final PulsarCommittableSourceLogic pulsarCommittableSourceLogic, final ExecutionContext executionContext, Try r9) {
            if (r9 instanceof Success) {
                final ConsumerMessage consumerMessage = (ConsumerMessage) ((Success) r9).value();
                pulsarCommittableSourceLogic.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$$outer().logger().debug(new StringBuilder(18).append("Message received: ").append(consumerMessage).toString());
                pulsarCommittableSourceLogic.receiveCallback().invoke(new CommittableMessage<T>(pulsarCommittableSourceLogic, consumerMessage, executionContext) { // from class: com.sksamuel.pulsar4s.akka.streams.PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$anon$1
                    private final /* synthetic */ PulsarCommittableSourceGraphStage.PulsarCommittableSourceLogic $outer;
                    private final ConsumerMessage msg$1;
                    private final ExecutionContext context$1;

                    @Override // com.sksamuel.pulsar4s.akka.streams.CommittableMessage
                    public boolean ack$default$1() {
                        boolean ack$default$1;
                        ack$default$1 = ack$default$1();
                        return ack$default$1;
                    }

                    @Override // com.sksamuel.pulsar4s.akka.streams.CommittableMessage
                    public ConsumerMessage<T> message() {
                        return this.msg$1;
                    }

                    @Override // com.sksamuel.pulsar4s.akka.streams.CommittableMessage
                    public Future<Done> ack(boolean z) {
                        this.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$$outer().logger().debug(new StringBuilder(23).append("Acknowledging message: ").append(this.msg$1).toString());
                        return (z ? (Future) this.$outer.consumer().acknowledgeCumulativeAsync(this.msg$1.messageId(), AsyncHandler$.MODULE$.handler(this.context$1)) : (Future) this.$outer.consumer().acknowledgeAsync(this.msg$1.messageId(), AsyncHandler$.MODULE$.handler(this.context$1))).map(boxedUnit -> {
                            return Done$.MODULE$;
                        }, this.context$1);
                    }

                    {
                        if (pulsarCommittableSourceLogic == null) {
                            throw null;
                        }
                        this.$outer = pulsarCommittableSourceLogic;
                        this.msg$1 = consumerMessage;
                        this.context$1 = executionContext;
                    }
                });
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
            if (!(r9 instanceof Failure)) {
                throw new MatchError(r9);
            }
            Throwable exception = ((Failure) r9).exception();
            pulsarCommittableSourceLogic.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$PulsarCommittableSourceLogic$$$outer().logger().warn("Error when receiving message", exception);
            pulsarCommittableSourceLogic.failStage(exception);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public PulsarCommittableSourceLogic(PulsarCommittableSourceGraphStage<T> pulsarCommittableSourceGraphStage, SourceShape<CommittableMessage<T>> sourceShape) {
            super(sourceShape);
            if (pulsarCommittableSourceGraphStage == null) {
                throw null;
            }
            this.$outer = pulsarCommittableSourceGraphStage;
            OutHandler.$init$(this);
            setHandler(pulsarCommittableSourceGraphStage.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$out(), this);
        }
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$sksamuel$exts$Logging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public Outlet<CommittableMessage<T>> com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$out() {
        return this.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$out;
    }

    /* renamed from: shape, reason: merged with bridge method [inline-methods] */
    public SourceShape<CommittableMessage<T>> m0shape() {
        return new SourceShape<>(com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$out());
    }

    public Tuple2<GraphStageLogic, Control> createLogicAndMaterializedValue(Attributes attributes) {
        final PulsarCommittableSourceLogic pulsarCommittableSourceLogic = new PulsarCommittableSourceLogic(this, m0shape());
        final PulsarCommittableSourceGraphStage pulsarCommittableSourceGraphStage = null;
        return new Tuple2<>(pulsarCommittableSourceLogic, new Control(pulsarCommittableSourceGraphStage, pulsarCommittableSourceLogic) { // from class: com.sksamuel.pulsar4s.akka.streams.PulsarCommittableSourceGraphStage$$anon$2
            private final PulsarCommittableSourceGraphStage.PulsarCommittableSourceLogic logic$1;

            @Override // com.sksamuel.pulsar4s.akka.streams.Control, java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                this.logic$1.completeStage();
            }

            /* JADX WARN: Multi-variable type inference failed */
            {
                this.logic$1 = pulsarCommittableSourceLogic;
            }
        });
    }

    public PulsarCommittableSourceGraphStage(Function0<Consumer<T>> function0, Option<MessageId> option) {
        this.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$create = function0;
        this.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$seek = option;
        Logging.$init$(this);
        this.com$sksamuel$pulsar4s$akka$streams$PulsarCommittableSourceGraphStage$$out = Outlet$.MODULE$.apply("pulsar.out");
    }
}
