package cloudflow.akkastream;

import akka.NotUsed;
import akka.NotUsed$;
import akka.actor.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ProducerMessage;
import akka.kafka.ProducerSettings;
import akka.kafka.ProducerSettings$;
import akka.kafka.scaladsl.Producer$;
import akka.stream.SharedKillSwitch;
import akka.stream.SinkRef;
import akka.stream.javadsl.Sink;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Sink$;
import cloudflow.streamlets.CodecOutlet;
import cloudflow.streamlets.Dun;
import cloudflow.streamlets.Dun$;
import cloudflow.streamlets.Topic;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import scala.MatchError;
import scala.Tuple2;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: KafkaSinkRef.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u001dd\u0001\u0002\t\u0012\u0005YA\u0001\"\f\u0001\u0003\u0002\u0003\u0006IA\f\u0005\tm\u0001\u0011\t\u0011)A\u0005o!AQ\b\u0001B\u0001B\u0003%a\b\u0003\u0005J\u0001\t\u0005\t\u0015!\u0003K\u0011!i\u0005A!A!\u0002\u0013q\u0005\u0002\u0003+\u0001\u0005\u0003\u0005\u000b\u0011B+\t\u000by\u0003A\u0011A0\t\u000f\u001d\u0004!\u0019!C\u0005Q\"1Q\u000f\u0001Q\u0001\n%DqA\u001e\u0001C\u0002\u0013%q\u000fC\u0004\u0002\n\u0001\u0001\u000b\u0011\u0002=\t\u000f\u0005-\u0001\u0001\"\u0001\u0002\u000e!9\u0011q\b\u0001\u0005\n\u0005\u0005\u0003bBA)\u0001\u0011\u0005\u00111\u000b\u0005\b\u0003?\u0002A\u0011BA1\u00051Y\u0015MZ6b'&t7NU3g\u0015\t\u00112#\u0001\u0006bW.\f7\u000f\u001e:fC6T\u0011\u0001F\u0001\nG2|W\u000f\u001a4m_^\u001c\u0001!\u0006\u0002\u0018IM\u0019\u0001\u0001\u0007\u0010\u0011\u0005eaR\"\u0001\u000e\u000b\u0003m\tQa]2bY\u0006L!!\b\u000e\u0003\r\u0005s\u0017PU3g!\ry\u0002EI\u0007\u0002#%\u0011\u0011%\u0005\u0002\u0010/JLG/\u00192mKNKgn\u001b*fMB\u00111\u0005\n\u0007\u0001\t\u0015)\u0003A1\u0001'\u0005\u0005!\u0016CA\u0014+!\tI\u0002&\u0003\u0002*5\t9aj\u001c;iS:<\u0007CA\r,\u0013\ta#DA\u0002B]f\faa]=ti\u0016l\u0007CA\u00185\u001b\u0005\u0001$BA\u00193\u0003\u0015\t7\r^8s\u0015\u0005\u0019\u0014\u0001B1lW\u0006L!!\u000e\u0019\u0003\u0017\u0005\u001bGo\u001c:TsN$X-\\\u0001\u0007_V$H.\u001a;\u0011\u0007aZ$%D\u0001:\u0015\tQ4#\u0001\u0006tiJ,\u0017-\u001c7fiNL!\u0001P\u001d\u0003\u0017\r{G-Z2PkRdW\r^\u0001\u0011E>|Go\u001d;sCB\u001cVM\u001d<feN\u0004\"a\u0010$\u000f\u0005\u0001#\u0005CA!\u001b\u001b\u0005\u0011%BA\"\u0016\u0003\u0019a$o\\8u}%\u0011QIG\u0001\u0007!J,G-\u001a4\n\u0005\u001dC%AB*ue&twM\u0003\u0002F5\u0005)Ao\u001c9jGB\u0011\u0001hS\u0005\u0003\u0019f\u0012Q\u0001V8qS\u000e\f!b[5mYN;\u0018\u000e^2i!\ty%+D\u0001Q\u0015\t\t&'\u0001\u0004tiJ,\u0017-\\\u0005\u0003'B\u0013\u0001c\u00155be\u0016$7*\u001b7m'^LGo\u00195\u0002#\r|W\u000e\u001d7fi&|g\u000e\u0015:p[&\u001cX\rE\u0002W3nk\u0011a\u0016\u0006\u00031j\t!bY8oGV\u0014(/\u001a8u\u0013\tQvKA\u0004Qe>l\u0017n]3\u0011\u0005ab\u0016BA/:\u0005\r!UO\\\u0001\u0007y%t\u0017\u000e\u001e \u0015\u000f\u0001\f'm\u00193fMB\u0019q\u0004\u0001\u0012\t\u000b5:\u0001\u0019\u0001\u0018\t\u000bY:\u0001\u0019A\u001c\t\u000bu:\u0001\u0019\u0001 \t\u000b%;\u0001\u0019\u0001&\t\u000b5;\u0001\u0019\u0001(\t\u000bQ;\u0001\u0019A+\u0002!A\u0014x\u000eZ;dKJ\u001cV\r\u001e;j]\u001e\u001cX#A5\u0011\t)lwn\\\u0007\u0002W*\u0011ANM\u0001\u0006W\u000647.Y\u0005\u0003].\u0014\u0001\u0003\u0015:pIV\u001cWM]*fiRLgnZ:\u0011\u0007e\u0001(/\u0003\u0002r5\t)\u0011I\u001d:bsB\u0011\u0011d]\u0005\u0003ij\u0011AAQ=uK\u0006\t\u0002O]8ek\u000e,'oU3ui&twm\u001d\u0011\u0002\u0011A\u0014x\u000eZ;dKJ,\u0012\u0001\u001f\t\u0006s\u0006\u0015qn\\\u0007\u0002u*\u0011ao\u001f\u0006\u0003yv\fqa\u00197jK:$8O\u0003\u0002m}*\u0019q0!\u0001\u0002\r\u0005\u0004\u0018m\u00195f\u0015\t\t\u0019!A\u0002pe\u001eL1!a\u0002{\u0005!\u0001&o\u001c3vG\u0016\u0014\u0018!\u00039s_\u0012,8-\u001a:!\u0003\u0011\u0019\u0018N\\6\u0015\u0005\u0005=\u0001\u0003CA\t\u0003/\tY\"a\u000e\u000e\u0005\u0005M!bAA\u000b!\u0006A1oY1mC\u0012\u001cH.\u0003\u0003\u0002\u001a\u0005M!\u0001B*j].\u0004b!GA\u000fE\u0005\u0005\u0012bAA\u00105\t1A+\u001e9mKJ\u0002B!a\t\u000229!\u0011QEA\u0017\u001d\u0011\t9#a\u000b\u000f\u0007\u0005\u000bI#C\u00014\u0013\ta''C\u0002\u00020-\fqbQ8ogVlWM]'fgN\fw-Z\u0005\u0005\u0003g\t)DA\u0006D_6l\u0017\u000e\u001e;bE2,'bAA\u0018WB!\u0011\u0011HA\u001e\u001b\u0005\u0011\u0014bAA\u001fe\t9aj\u001c;Vg\u0016$\u0017!\u00055b]\u0012dW\rV3s[&t\u0017\r^5p]V!\u00111IA'+\t\t)\u0005\u0005\u0006\u0002\u0012\u0005\u001d\u00131JA&\u0003oIA!!\u0013\u0002\u0014\t!a\t\\8x!\r\u0019\u0013Q\n\u0003\u0007\u0003\u001fj!\u0019\u0001\u0014\u0003\u0003%\u000bQa\u001e:ji\u0016$B!!\u0016\u0002\\A!a+a\u0016#\u0013\r\tIf\u0016\u0002\u0007\rV$XO]3\t\r\u0005uc\u00021\u0001#\u0003\u00151\u0018\r\\;f\u0003!YW-\u001f\"zi\u0016\u001cHcA8\u0002d!1\u0011QM\bA\u0002y\n1a[3z\u0001")
/* loaded from: input_file:cloudflow/akkastream/KafkaSinkRef.class */
public final class KafkaSinkRef<T> implements WritableSinkRef<T> {
    private final ActorSystem system;
    private final CodecOutlet<T> outlet;
    private final Topic topic;
    private final SharedKillSwitch killSwitch;
    private final Promise<Dun> completionPromise;
    private final ProducerSettings<byte[], byte[]> producerSettings;
    private final Producer<byte[], byte[]> producer;

    @Override // cloudflow.akkastream.WritableSinkRef
    public CompletionStage<T> writeJava(T t) {
        CompletionStage<T> writeJava;
        writeJava = writeJava(t);
        return writeJava;
    }

    public final Sink<Tuple2<T, ConsumerMessage.Committable>, NotUsed> getSink() {
        return SinkRef.getSink$(this);
    }

    private ProducerSettings<byte[], byte[]> producerSettings() {
        return this.producerSettings;
    }

    private Producer<byte[], byte[]> producer() {
        return this.producer;
    }

    public akka.stream.scaladsl.Sink<Tuple2<T, ConsumerMessage.Committable>, NotUsed> sink() {
        this.system.log().info(new StringBuilder(25).append("Creating sink for topic: ").append(this.topic).toString());
        return Flow$.MODULE$.apply().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Object _1 = tuple2._1();
            ConsumerMessage.Committable committable = (ConsumerMessage.Committable) tuple2._2();
            String str = (String) this.outlet.partitioner().apply(_1);
            return new ProducerMessage.Message(new ProducerRecord(this.topic.name(), str.getBytes("UTF8"), this.outlet.codec().encode(_1)), committable);
        }).via(Producer$.MODULE$.flexiFlow(producerSettings().withProducer(producer()))).via(handleTermination()).to(Sink$.MODULE$.ignore()).mapMaterializedValue(notUsed -> {
            return NotUsed$.MODULE$;
        });
    }

    private <I> Flow<I, I, NotUsed> handleTermination() {
        return Flow$.MODULE$.apply().via(this.killSwitch.flow()).alsoTo(Sink$.MODULE$.onComplete(r4 -> {
            $anonfun$handleTermination$1(this, r4);
            return BoxedUnit.UNIT;
        }));
    }

    @Override // cloudflow.akkastream.WritableSinkRef
    public Future<T> write(final T t) {
        ProducerRecord producerRecord = new ProducerRecord(this.topic.name(), keyBytes((String) this.outlet.partitioner().apply(t)), this.outlet.codec().encode(t));
        final Promise apply = Promise$.MODULE$.apply();
        final KafkaSinkRef kafkaSinkRef = null;
        producer().send(producerRecord, new Callback(kafkaSinkRef, apply, t) { // from class: cloudflow.akkastream.KafkaSinkRef$$anon$1
            private final Promise promise$1;
            private final Object value$1;

            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    this.promise$1.success(this.value$1);
                } else {
                    this.promise$1.failure(exc);
                }
            }

            {
                this.promise$1 = apply;
                this.value$1 = t;
            }
        });
        return apply.future();
    }

    private byte[] keyBytes(String str) {
        if (str != null) {
            return str.getBytes("UTF8");
        }
        return null;
    }

    public static final /* synthetic */ void $anonfun$handleTermination$1(KafkaSinkRef kafkaSinkRef, Try r5) {
        if (r5 instanceof Success) {
            kafkaSinkRef.system.log().error("Stream has completed. Shutting down streamlet...");
            kafkaSinkRef.completionPromise.success(Dun$.MODULE$);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r5 instanceof Failure)) {
                throw new MatchError(r5);
            }
            Throwable exception = ((Failure) r5).exception();
            kafkaSinkRef.system.log().error(exception, "Stream has failed. Shutting down streamlet...");
            kafkaSinkRef.completionPromise.failure(exception);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public KafkaSinkRef(ActorSystem actorSystem, CodecOutlet<T> codecOutlet, String str, Topic topic, SharedKillSwitch sharedKillSwitch, Promise<Dun> promise) {
        this.system = actorSystem;
        this.outlet = codecOutlet;
        this.topic = topic;
        this.killSwitch = sharedKillSwitch;
        this.completionPromise = promise;
        SinkRef.$init$(this);
        WritableSinkRef.$init$(this);
        this.producerSettings = ProducerSettings$.MODULE$.apply(actorSystem, new ByteArraySerializer(), new ByteArraySerializer()).withBootstrapServers(str).withProperties(topic.kafkaProducerProperties());
        this.producer = producerSettings().createKafkaProducer();
    }
}
