package com.sksamuel.pulsar4s.akka.streams;

import akka.Done$;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import com.sksamuel.pulsar4s.AsyncHandler$;
import com.sksamuel.pulsar4s.Producer;
import com.sksamuel.pulsar4s.ProducerMessage;
import com.sksamuel.pulsar4s.Topic;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Iterable;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set$;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.duration.package;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: PulsarMultiSinkGraphStage.scala */
/* loaded from: input_file:com/sksamuel/pulsar4s/akka/streams/PulsarMultiSinkGraphStage$$anon$1.class */
public final class PulsarMultiSinkGraphStage$$anon$1 extends GraphStageLogic implements InHandler {
    private Map<Topic, Producer<T>> producers;
    private AsyncCallback<Tuple2<Topic, ProducerMessage<T>>> next;
    private Throwable error;
    private final /* synthetic */ PulsarMultiSinkGraphStage $outer;
    private final Promise promise$1;

    private ExecutionContextExecutor context() {
        return super.materializer().executionContext();
    }

    private Map<Topic, Producer<T>> producers() {
        return this.producers;
    }

    private void producers_$eq(Map<Topic, Producer<T>> map) {
        this.producers = map;
    }

    private AsyncCallback<Tuple2<Topic, ProducerMessage<T>>> next() {
        return this.next;
    }

    private void next_$eq(AsyncCallback<Tuple2<Topic, ProducerMessage<T>>> asyncCallback) {
        this.next = asyncCallback;
    }

    private Throwable error() {
        return this.error;
    }

    private void error_$eq(Throwable th) {
        this.error = th;
    }

    public void preStart() {
        producers_$eq(((TraversableOnce) this.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarMultiSinkGraphStage$$initTopics.map(topic -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic), this.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarMultiSinkGraphStage$$createFn.apply(topic));
        }, Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        next_$eq(getAsyncCallback(tuple2 -> {
            $anonfun$preStart$2(this, tuple2);
            return BoxedUnit.UNIT;
        }));
        pull(this.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarMultiSinkGraphStage$$in());
    }

    private Producer<T> getProducer(Topic topic) {
        Producer producer;
        Some some = producers().get(topic);
        if (some instanceof Some) {
            producer = (Producer) some.value();
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            this.$outer.logger().debug(new StringBuilder(32).append("creating new producer for topic ").append(topic).toString());
            Producer producer2 = (Producer) this.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarMultiSinkGraphStage$$createFn.apply(topic);
            producers_$eq(producers().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic), producer2)));
            producer = producer2;
        }
        return producer;
    }

    public void onPush() {
        try {
            Tuple2 tuple2 = (Tuple2) grab(this.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarMultiSinkGraphStage$$in());
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple22 = new Tuple2((Topic) tuple2._1(), (ProducerMessage) tuple2._2());
            Topic topic = (Topic) tuple22._1();
            ProducerMessage producerMessage = (ProducerMessage) tuple22._2();
            this.$outer.logger().debug(new StringBuilder(20).append("Sending message ").append(producerMessage).append(" to ").append(topic).toString());
            ((Future) getProducer(topic).sendAsync(producerMessage, AsyncHandler$.MODULE$.handler(context()))).onComplete(r8 -> {
                $anonfun$onPush$1(this, topic, producerMessage, r8);
                return BoxedUnit.UNIT;
            }, context());
        } catch (Throwable th) {
            this.$outer.logger().error("Failing pulsar sink stage", th);
            failStage(th);
        }
    }

    public void postStop() {
        this.$outer.logger().debug("Graph stage stopping; closing producers");
        Await$.MODULE$.ready(Future$.MODULE$.sequence((Iterable) producers().flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Producer producer = (Producer) tuple2._2();
            return new $colon.colon((Future) producer.flushAsync(AsyncHandler$.MODULE$.handler(this.context())), new $colon.colon((Future) producer.closeAsync(AsyncHandler$.MODULE$.handler(this.context())), Nil$.MODULE$));
        }, Iterable$.MODULE$.canBuildFrom()), Iterable$.MODULE$.canBuildFrom(), context()), new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(15)).seconds());
    }

    public void onUpstreamFailure(Throwable th) {
        this.promise$1.tryFailure(th);
    }

    public void onUpstreamFinish() {
        this.promise$1.trySuccess(Done$.MODULE$);
    }

    public static final /* synthetic */ void $anonfun$preStart$2(PulsarMultiSinkGraphStage$$anon$1 pulsarMultiSinkGraphStage$$anon$1, Tuple2 tuple2) {
        pulsarMultiSinkGraphStage$$anon$1.pull(pulsarMultiSinkGraphStage$$anon$1.$outer.com$sksamuel$pulsar4s$akka$streams$PulsarMultiSinkGraphStage$$in());
    }

    public static final /* synthetic */ void $anonfun$onPush$1(PulsarMultiSinkGraphStage$$anon$1 pulsarMultiSinkGraphStage$$anon$1, Topic topic, ProducerMessage producerMessage, Try r8) {
        if (r8 instanceof Success) {
            pulsarMultiSinkGraphStage$$anon$1.next().invoke(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic), producerMessage));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r8 instanceof Failure)) {
                throw new MatchError(r8);
            }
            Throwable exception = ((Failure) r8).exception();
            pulsarMultiSinkGraphStage$$anon$1.$outer.logger().error("Failing pulsar sink stage", exception);
            pulsarMultiSinkGraphStage$$anon$1.failStage(exception);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PulsarMultiSinkGraphStage$$anon$1(PulsarMultiSinkGraphStage pulsarMultiSinkGraphStage, Promise promise) {
        super(pulsarMultiSinkGraphStage.m1shape());
        if (pulsarMultiSinkGraphStage == null) {
            throw null;
        }
        this.$outer = pulsarMultiSinkGraphStage;
        this.promise$1 = promise;
        InHandler.$init$(this);
        setHandler(pulsarMultiSinkGraphStage.com$sksamuel$pulsar4s$akka$streams$PulsarMultiSinkGraphStage$$in(), this);
    }
}
