package cloudflow.akkastream;

import akka.NotUsed;
import akka.NotUsed$;
import akka.actor.ActorSystem;
import akka.kafka.CommitterSettings;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.ConsumerSettings$;
import akka.kafka.ProducerMessage;
import akka.kafka.ProducerSettings;
import akka.kafka.ProducerSettings$;
import akka.kafka.Subscriptions$;
import akka.kafka.scaladsl.Committer$;
import akka.kafka.scaladsl.Consumer$;
import akka.kafka.scaladsl.Producer$;
import akka.stream.ActorMaterializer;
import akka.stream.ActorMaterializer$;
import akka.stream.KillSwitches$;
import akka.stream.SharedKillSwitch;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.SourceWithContext;
import cloudflow.streamlets.CodecInlet;
import cloudflow.streamlets.CodecOutlet;
import cloudflow.streamlets.Dun;
import cloudflow.streamlets.Dun$;
import cloudflow.streamlets.SavepointPath;
import cloudflow.streamlets.StreamletContext;
import cloudflow.streamlets.StreamletContext$MountedPathUnavailableException$;
import cloudflow.streamlets.StreamletDefinition;
import cloudflow.streamlets.StreamletExecution;
import cloudflow.streamlets.StreamletPort;
import cloudflow.streamlets.VolumeMount;
import com.typesafe.config.Config;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: AkkaStreamletContextImpl.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u0005h\u0001B\u0014)\u00055B\u0011\u0002\u000f\u0001\u0003\u0006\u0004%\tEK\u001d\t\u0011\u0001\u0003!\u0011!Q\u0001\niB\u0001\"\u0011\u0001\u0003\u0002\u0003\u0006IA\u0011\u0005\u0006\u0015\u0002!\ta\u0013\u0005\b\u001f\u0002\u0011\r\u0011b\u0001Q\u0011\u0019\t\u0006\u0001)A\u0005\u0005\")!\u000b\u0001C\u0002'\")!\f\u0001C!7\"9Q\r\u0001b\u0001\n\u00131\u0007B\u00029\u0001A\u0003%q\rC\u0004r\u0001\t\u0007I\u0011\u00024\t\rI\u0004\u0001\u0015!\u0003h\u0011\u001d\u0019\bA1A\u0005\nQDa\u0001\u001f\u0001!\u0002\u0013)\bbB=\u0001\u0005\u0004%\tA\u001f\u0005\u0007}\u0002\u0001\u000b\u0011B>\t\u0011}\u0004!\u0019!C\u0001\u0003\u0003A\u0001\"!\u0003\u0001A\u0003%\u00111\u0001\u0005\n\u0003#\u0001!\u0019!C\u0005\u0003'A\u0001\"!\n\u0001A\u0003%\u0011Q\u0003\u0005\b\u0003O\u0001A\u0011AA\u0015\u0011\u001d\t\t\b\u0001C\u0001\u0003gBq!!\u001d\u0001\t\u0003\t\u0019\r\u0003\u0005\u0002R\u0002!\t\u0001KAj\u0011!\t\t\u000e\u0001C\u0001Q\u0005-\bbBA}\u0001\u0011\u0005\u00111 \u0005\n\u0005/\u0001\u0011\u0013!C\u0001\u00053AqAa\r\u0001\t\u0003\u0011)\u0004C\u0004\u0003D\u0001!\tA!\u0012\t\u000f\t]\u0003\u0001\"\u0003\u0003Z!I!\u0011\u0010\u0001C\u0002\u0013%!1\u0010\u0005\t\u0005K\u0003\u0001\u0015!\u0003\u0003~!9!q\u0015\u0001\u0005\u0002\t%\u0006b\u0002B[\u0001\u0011%!q\u0017\u0005\b\u0005s\u0003A\u0011\u0002B^\u0011\u001d\u0011I\r\u0001C\u0001\u0005\u0017DqAa5\u0001\t\u0003\u0011)\u000eC\u0004\u0003X\u0002!\tA!7\u00031\u0005[7.Y*ue\u0016\fW\u000e\\3u\u0007>tG/\u001a=u\u00136\u0004HN\u0003\u0002*U\u0005Q\u0011m[6bgR\u0014X-Y7\u000b\u0003-\n\u0011b\u00197pk\u00124Gn\\<\u0004\u0001M\u0019\u0001A\f\u001b\u0011\u0005=\u0012T\"\u0001\u0019\u000b\u0003E\nQa]2bY\u0006L!a\r\u0019\u0003\r\u0005s\u0017PU3g!\t)d'D\u0001)\u0013\t9\u0004F\u0001\u000bBW.\f7\u000b\u001e:fC6dW\r^\"p]R,\u0007\u0010^\u0001\u0014gR\u0014X-Y7mKR$UMZ5oSRLwN\\\u000b\u0002uA\u00111HP\u0007\u0002y)\u0011QHK\u0001\u000bgR\u0014X-Y7mKR\u001c\u0018BA =\u0005M\u0019FO]3b[2,G\u000fR3gS:LG/[8o\u0003Q\u0019HO]3b[2,G\u000fR3gS:LG/[8oA\u0005\u00191/_:\u0011\u0005\rCU\"\u0001#\u000b\u0005\u00153\u0015!B1di>\u0014(\"A$\u0002\t\u0005\\7.Y\u0005\u0003\u0013\u0012\u00131\"Q2u_J\u001c\u0016p\u001d;f[\u00061A(\u001b8jiz\"2\u0001T'O!\t)\u0004\u0001C\u00039\t\u0001\u0007!\bC\u0003B\t\u0001\u0007!)\u0001\u0004tsN$X-\\\u000b\u0002\u0005\u000691/_:uK6\u0004\u0013\u0001D7bi\u0016\u0014\u0018.\u00197ju\u0016\u0014X#\u0001+\u0011\u0005UCV\"\u0001,\u000b\u0005]3\u0015AB:ue\u0016\fW.\u0003\u0002Z-\n\t\u0012i\u0019;pe6\u000bG/\u001a:jC2L'0\u001a:\u0002\r\r|gNZ5h+\u0005a\u0006CA/d\u001b\u0005q&B\u0001.`\u0015\t\u0001\u0017-\u0001\u0005usB,7/\u00194f\u0015\u0005\u0011\u0017aA2p[&\u0011AM\u0018\u0002\u0007\u0007>tg-[4\u0002\u0019I,\u0017\rZ=Qe>l\u0017n]3\u0016\u0003\u001d\u00042\u0001[6n\u001b\u0005I'B\u000161\u0003)\u0019wN\\2veJ,g\u000e^\u0005\u0003Y&\u0014q\u0001\u0015:p[&\u001cX\r\u0005\u0002<]&\u0011q\u000e\u0010\u0002\u0004\tVt\u0017!\u0004:fC\u0012L\bK]8nSN,\u0007%A\td_6\u0004H.\u001a;j_:\u0004&o\\7jg\u0016\f!cY8na2,G/[8o!J|W.[:fA\u0005\u00012m\\7qY\u0016$\u0018n\u001c8GkR,(/Z\u000b\u0002kB\u0019\u0001N^7\n\u0005]L'A\u0002$viV\u0014X-A\td_6\u0004H.\u001a;j_:4U\u000f^;sK\u0002\n!b[5mYN;\u0018\u000e^2i+\u0005Y\bCA+}\u0013\tihK\u0001\tTQ\u0006\u0014X\rZ&jY2\u001cv/\u001b;dQ\u0006Y1.\u001b7m'^LGo\u00195!\u0003I\u0019HO]3b[2,G/\u0012=fGV$\u0018n\u001c8\u0016\u0005\u0005\r!#BA\u0003]\u0005-aABA\u0004%\u0001\t\u0019A\u0001\u0007=e\u00164\u0017N\\3nK:$h(A\ntiJ,\u0017-\u001c7fi\u0016CXmY;uS>t\u0007\u0005E\u0002<\u0003\u001bI1!a\u0004=\u0005I\u0019FO]3b[2,G/\u0012=fGV$\u0018n\u001c8\u0002!\t|w\u000e^:ue\u0006\u00048+\u001a:wKJ\u001cXCAA\u000b!\u0011\t9\"!\t\u000e\u0005\u0005e!\u0002BA\u000e\u0003;\tA\u0001\\1oO*\u0011\u0011qD\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002$\u0005e!AB*ue&tw-A\tc_>$8\u000f\u001e:baN+'O^3sg\u0002\nqc]8ve\u000e,w+\u001b;i\u001f\u001a47/\u001a;D_:$X\r\u001f;\u0016\t\u0005-\u0012Q\u000b\u000b\u0005\u0003[\t9\u0007\u0005\u0004\u00020\u0005-\u0013\u0011\u000b\b\u0005\u0003c\t)E\u0004\u0003\u00024\u0005\u0005c\u0002BA\u001b\u0003\u007fqA!a\u000e\u0002>5\u0011\u0011\u0011\b\u0006\u0004\u0003wa\u0013A\u0002\u001fs_>$h(C\u0001,\u0013\tI#&C\u0002\u0002D!\n\u0001b]2bY\u0006$7\u000f\\\u0005\u0005\u0003\u000f\nI%A\u0004qC\u000e\\\u0017mZ3\u000b\u0007\u0005\r\u0003&\u0003\u0003\u0002N\u0005=#aF*pkJ\u001cWmV5uQ>3gm]3u\u0007>tG/\u001a=u\u0015\u0011\t9%!\u0013\u0011\t\u0005M\u0013Q\u000b\u0007\u0001\t\u001d\t9&\u0006b\u0001\u00033\u0012\u0011\u0001V\t\u0005\u00037\n\t\u0007E\u00020\u0003;J1!a\u00181\u0005\u001dqu\u000e\u001e5j]\u001e\u00042aLA2\u0013\r\t)\u0007\r\u0002\u0004\u0003:L\bbBA5+\u0001\u0007\u00111N\u0001\u0006S:dW\r\u001e\t\u0006w\u00055\u0014\u0011K\u0005\u0004\u0003_b$AC\"pI\u0016\u001c\u0017J\u001c7fi\u0006y1m\\7nSR$\u0018M\u00197f'&t7.\u0006\u0003\u0002v\u0005%ECBA<\u0003[\u000b9\f\u0005\u0005\u0002z\u0005u\u0014\u0011QAS\u001b\t\tYHC\u0002\u0002DYKA!a \u0002|\t!1+\u001b8l!\u001dy\u00131QAD\u0003\u0017K1!!\"1\u0005\u0019!V\u000f\u001d7feA!\u00111KAE\t\u001d\t9F\u0006b\u0001\u00033\u0002B!!$\u0002 :!\u0011qRAM\u001d\u0011\t\t*!&\u000f\t\u0005]\u00121S\u0005\u0002\u000f&\u0019\u0011q\u0013$\u0002\u000b-\fgm[1\n\t\u0005m\u0015QT\u0001\u0010\u0007>t7/^7fe6+7o]1hK*\u0019\u0011q\u0013$\n\t\u0005\u0005\u00161\u0015\u0002\f\u0007>lW.\u001b;uC\ndWM\u0003\u0003\u0002\u001c\u0006u\u0005\u0003BAT\u0003Sk\u0011AR\u0005\u0004\u0003W3%a\u0002(piV\u001bX\r\u001a\u0005\b\u0003_3\u0002\u0019AAY\u0003\u0019yW\u000f\u001e7fiB)1(a-\u0002\b&\u0019\u0011Q\u0017\u001f\u0003\u0017\r{G-Z2PkRdW\r\u001e\u0005\b\u0003s3\u0002\u0019AA^\u0003E\u0019w.\\7jiR,'oU3ui&twm\u001d\t\u0005\u0003{\u000by,\u0004\u0002\u0002\u001e&!\u0011\u0011YAO\u0005E\u0019u.\\7jiR,'oU3ui&twm]\u000b\u0005\u0003\u000b\fi\r\u0006\u0003\u0002H\u0006=\u0007\u0003CA=\u0003{\nI-!*\u0011\u000f=\n\u0019)a3\u0002\fB!\u00111KAg\t\u001d\t9f\u0006b\u0001\u00033Bq!!/\u0018\u0001\u0004\tY,A\u000btS:\\w+\u001b;i\u001f\u001a47/\u001a;D_:$X\r\u001f;\u0016\t\u0005U\u0017Q\u001c\u000b\u0007\u0003/\f)/!;\u0011\u0011\u0005e\u0014QPAm\u0003K\u0003raLAB\u00037\fy\u000e\u0005\u0003\u0002T\u0005uGaBA,1\t\u0007\u0011\u0011\f\t\u0005\u0003\u001b\u000b\t/\u0003\u0003\u0002d\u0006\r&!E\"p[6LG\u000f^1cY\u0016|eMZ:fi\"9\u0011q\u0016\rA\u0002\u0005\u001d\b#B\u001e\u00024\u0006m\u0007bBA]1\u0001\u0007\u00111X\u000b\u0005\u0003[\f)\u0010\u0006\u0003\u0002p\u0006]\b\u0003CA=\u0003{\n\t0!*\u0011\u000f=\n\u0019)a=\u0002`B!\u00111KA{\t\u001d\t9&\u0007b\u0001\u00033Bq!!/\u001a\u0001\u0004\tY,A\u0006qY\u0006LgnU8ve\u000e,W\u0003BA\u007f\u0005\u000f!b!a@\u0003\n\t5\u0001\u0003CA=\u0005\u0003\u0011)!!*\n\t\t\r\u00111\u0010\u0002\u0007'>,(oY3\u0011\t\u0005M#q\u0001\u0003\b\u0003/R\"\u0019AA-\u0011\u001d\tIG\u0007a\u0001\u0005\u0017\u0001RaOA7\u0005\u000bA\u0011Ba\u0004\u001b!\u0003\u0005\rA!\u0005\u0002\u001bI,7/\u001a;Q_NLG/[8o!\r)$1C\u0005\u0004\u0005+A#!\u0004*fg\u0016$\bk\\:ji&|g.A\u000bqY\u0006LgnU8ve\u000e,G\u0005Z3gCVdG\u000f\n\u001a\u0016\t\tm!\u0011G\u000b\u0003\u0005;QCA!\u0005\u0003 -\u0012!\u0011\u0005\t\u0005\u0005G\u0011i#\u0004\u0002\u0003&)!!q\u0005B\u0015\u0003%)hn\u00195fG.,GMC\u0002\u0003,A\n!\"\u00198o_R\fG/[8o\u0013\u0011\u0011yC!\n\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\rB\u0004\u0002Xm\u0011\r!!\u0017\u0002\u0013Ad\u0017-\u001b8TS:\\W\u0003\u0002B\u001c\u0005{!BA!\u000f\u0003@AA\u0011\u0011PA?\u0005w\t)\u000b\u0005\u0003\u0002T\tuBaBA,9\t\u0007\u0011\u0011\f\u0005\b\u0003_c\u0002\u0019\u0001B!!\u0015Y\u00141\u0017B\u001e\u0003\u001d\u0019\u0018N\\6SK\u001a,BAa\u0012\u0003RQ!!\u0011\nB*!\u0015)$1\nB(\u0013\r\u0011i\u0005\u000b\u0002\u0010/JLG/\u00192mKNKgn\u001b*fMB!\u00111\u000bB)\t\u001d\t9&\bb\u0001\u00033Bq!a,\u001e\u0001\u0004\u0011)\u0006E\u0003<\u0003g\u0013y%\u0001\u0005lKf\u0014\u0015\u0010^3t)\u0011\u0011YFa\u001a\u0011\u000b=\u0012iF!\u0019\n\u0007\t}\u0003GA\u0003BeJ\f\u0017\u0010E\u00020\u0005GJ1A!\u001a1\u0005\u0011\u0011\u0015\u0010^3\t\u000f\t%d\u00041\u0001\u0003l\u0005\u00191.Z=\u0011\t\t5$Q\u000f\b\u0005\u0005_\u0012\t\bE\u0002\u00028AJ1Aa\u001d1\u0003\u0019\u0001&/\u001a3fM&!\u00111\u0005B<\u0015\r\u0011\u0019\bM\u0001\tgR|\u0007\u000f]3sgV\u0011!Q\u0010\t\u0007\u0005\u007f\u0012YIa$\u000e\u0005\t\u0005%\u0002\u0002BB\u0005\u000b\u000ba!\u0019;p[&\u001c'b\u00016\u0003\b*!!\u0011RA\u000f\u0003\u0011)H/\u001b7\n\t\t5%\u0011\u0011\u0002\u0010\u0003R|W.[2SK\u001a,'/\u001a8dKB1!\u0011\u0013BN\u0005?k!Aa%\u000b\t\tU%qS\u0001\nS6lW\u000f^1cY\u0016T1A!'1\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0005;\u0013\u0019J\u0001\u0004WK\u000e$xN\u001d\t\u0005_\t\u0005V/C\u0002\u0003$B\u0012\u0011BR;oGRLwN\u001c\u0019\u0002\u0013M$x\u000e\u001d9feN\u0004\u0013AB8o'R|\u0007\u000f\u0006\u0003\u0003,\nE\u0006cA\u0018\u0003.&\u0019!q\u0016\u0019\u0003\tUs\u0017\u000e\u001e\u0005\b\u0005g\u000b\u0003\u0019\u0001BP\u0003\u00051\u0017AF:ue\u0016\fW\u000e\\3u\t\u00164\u0017N\\5uS>tWj]4\u0016\u0005\t-\u0014!\u00055b]\u0012dW\rV3s[&t\u0017\r^5p]V!!Q\u0018Bd+\t\u0011y\f\u0005\u0006\u0002z\t\u0005'Q\u0019Bc\u0003KKAAa1\u0002|\t!a\t\\8x!\u0011\t\u0019Fa2\u0005\u000f\u0005]3E1\u0001\u0002Z\u0005Y1/[4oC2\u0014V-\u00193z)\t\u0011i\rE\u00020\u0005\u001fL1A!51\u0005\u001d\u0011un\u001c7fC:\fAa\u001d;paR\tQ/\u0001\u0006nKR\u0014\u0018n\u0019+bON$\"Aa7\u0011\u0011\t5$Q\u001cB6\u0005WJAAa8\u0003x\t\u0019Q*\u00199")
/* loaded from: input_file:cloudflow/akkastream/AkkaStreamletContextImpl.class */
public final class AkkaStreamletContextImpl implements AkkaStreamletContext {
    private final StreamletDefinition streamletDefinition;
    private final ActorSystem system;
    private final Promise<Dun> cloudflow$akkastream$AkkaStreamletContextImpl$$readyPromise;
    private final Promise<Dun> completionPromise;
    private final Future<Dun> cloudflow$akkastream$AkkaStreamletContextImpl$$completionFuture;
    private final SharedKillSwitch killSwitch;
    private final StreamletExecution streamletExecution;
    private final String bootstrapServers;
    private final AtomicReference<Vector<Function0<Future<Dun>>>> stoppers;
    private volatile StreamletContext$MountedPathUnavailableException$ MountedPathUnavailableException$module;

    public String streamletRef() {
        return StreamletContext.streamletRef$(this);
    }

    public SavepointPath findSavepointPathForPort(StreamletPort streamletPort) {
        return StreamletContext.findSavepointPathForPort$(this, streamletPort);
    }

    public final Config streamletConfig() {
        return StreamletContext.streamletConfig$(this);
    }

    public Path getMountedPath(VolumeMount volumeMount) {
        return StreamletContext.getMountedPath$(this, volumeMount);
    }

    public StreamletContext$MountedPathUnavailableException$ MountedPathUnavailableException() {
        if (this.MountedPathUnavailableException$module == null) {
            MountedPathUnavailableException$lzycompute$1();
        }
        return this.MountedPathUnavailableException$module;
    }

    public StreamletDefinition streamletDefinition() {
        return this.streamletDefinition;
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public ActorSystem system() {
        return this.system;
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    /* renamed from: materializer, reason: merged with bridge method [inline-methods] */
    public ActorMaterializer mo2materializer() {
        return ActorMaterializer$.MODULE$.apply(ActorMaterializer$.MODULE$.apply$default$1(), ActorMaterializer$.MODULE$.apply$default$2(), system());
    }

    public Config config() {
        return streamletDefinition().config();
    }

    public Promise<Dun> cloudflow$akkastream$AkkaStreamletContextImpl$$readyPromise() {
        return this.cloudflow$akkastream$AkkaStreamletContextImpl$$readyPromise;
    }

    private Promise<Dun> completionPromise() {
        return this.completionPromise;
    }

    public Future<Dun> cloudflow$akkastream$AkkaStreamletContextImpl$$completionFuture() {
        return this.cloudflow$akkastream$AkkaStreamletContextImpl$$completionFuture;
    }

    public SharedKillSwitch killSwitch() {
        return this.killSwitch;
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public StreamletExecution streamletExecution() {
        return this.streamletExecution;
    }

    private String bootstrapServers() {
        return this.bootstrapServers;
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public <T> SourceWithContext<T, ConsumerMessage.CommittableOffset, Object> sourceWithOffsetContext(CodecInlet<T> codecInlet) {
        SavepointPath findSavepointPathForPort = findSavepointPathForPort(codecInlet);
        String value = findSavepointPathForPort.value();
        String groupId = findSavepointPathForPort.groupId(streamletRef(), codecInlet);
        ConsumerSettings withProperty = ConsumerSettings$.MODULE$.apply(system(), new ByteArrayDeserializer(), new ByteArrayDeserializer()).withBootstrapServers(bootstrapServers()).withGroupId(groupId).withProperty("auto.offset.reset", "earliest");
        system().log().info(new StringBuilder(47).append("Creating committable source for group: ").append(groupId).append(" topic: ").append(value).toString());
        return Consumer$.MODULE$.sourceWithOffsetContext(withProperty, Subscriptions$.MODULE$.topics(Predef$.MODULE$.wrapRefArray(new String[]{value}))).asSource().mapMaterializedValue(control -> {
            return NotUsed$.MODULE$;
        }).via(handleTermination()).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            ConsumerRecord consumerRecord = (ConsumerRecord) tuple2._1();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(codecInlet.codec().decode((byte[]) consumerRecord.value())), (ConsumerMessage.CommittableOffset) tuple2._2());
        }).asSourceWithContext(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            return (ConsumerMessage.CommittableOffset) tuple22._2();
        }).map(tuple23 -> {
            if (tuple23 == null) {
                throw new MatchError(tuple23);
            }
            return tuple23._1();
        });
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public <T> Sink<Tuple2<T, ConsumerMessage.Committable>, NotUsed> committableSink(CodecOutlet<T> codecOutlet, CommitterSettings committerSettings) {
        ProducerSettings withBootstrapServers = ProducerSettings$.MODULE$.apply(system(), new ByteArraySerializer(), new ByteArraySerializer()).withBootstrapServers(bootstrapServers());
        String value = findSavepointPathForPort(codecOutlet).value();
        return Flow$.MODULE$.apply().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Object _1 = tuple2._1();
            return new ProducerMessage.Message(new ProducerRecord(value, this.keyBytes((String) codecOutlet.partitioner().apply(_1)), codecOutlet.codec().encode(_1)), (ConsumerMessage.Committable) tuple2._2());
        }).via(handleTermination()).toMat(Producer$.MODULE$.committableSink(withBootstrapServers, committerSettings), Keep$.MODULE$.left());
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public <T> Sink<Tuple2<T, ConsumerMessage.Committable>, NotUsed> committableSink(CommitterSettings committerSettings) {
        return Flow$.MODULE$.apply().toMat(Committer$.MODULE$.sinkWithOffsetContext(committerSettings), Keep$.MODULE$.left());
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public <T> Sink<Tuple2<T, ConsumerMessage.CommittableOffset>, NotUsed> sinkWithOffsetContext(CodecOutlet<T> codecOutlet, CommitterSettings committerSettings) {
        ProducerSettings withBootstrapServers = ProducerSettings$.MODULE$.apply(system(), new ByteArraySerializer(), new ByteArraySerializer()).withBootstrapServers(bootstrapServers());
        String value = findSavepointPathForPort(codecOutlet).value();
        return Flow$.MODULE$.apply().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Object _1 = tuple2._1();
            return new ProducerMessage.Message(new ProducerRecord(value, this.keyBytes((String) codecOutlet.partitioner().apply(_1)), codecOutlet.codec().encode(_1)), (ConsumerMessage.CommittableOffset) tuple2._2());
        }).toMat(Producer$.MODULE$.committableSink(withBootstrapServers, committerSettings), Keep$.MODULE$.left());
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public <T> Sink<Tuple2<T, ConsumerMessage.CommittableOffset>, NotUsed> sinkWithOffsetContext(CommitterSettings committerSettings) {
        return Flow$.MODULE$.apply().toMat(Committer$.MODULE$.sinkWithOffsetContext(committerSettings), Keep$.MODULE$.left());
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public <T> Source<T, NotUsed> plainSource(CodecInlet<T> codecInlet, ResetPosition resetPosition) {
        SavepointPath findSavepointPathForPort = findSavepointPathForPort(codecInlet);
        String value = findSavepointPathForPort.value();
        return Consumer$.MODULE$.plainSource(ConsumerSettings$.MODULE$.apply(system(), new ByteArrayDeserializer(), new ByteArrayDeserializer()).withBootstrapServers(bootstrapServers()).withGroupId(findSavepointPathForPort.groupId(streamletRef(), codecInlet)).withProperty("auto.offset.reset", resetPosition.autoOffsetReset()), Subscriptions$.MODULE$.topics(Predef$.MODULE$.wrapRefArray(new String[]{value}))).mapMaterializedValue(control -> {
            return NotUsed$.MODULE$;
        }).via(handleTermination()).map(consumerRecord -> {
            return codecInlet.codec().decode((byte[]) consumerRecord.value());
        });
    }

    public <T> ResetPosition plainSource$default$2() {
        return Latest$.MODULE$;
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public <T> Sink<T, NotUsed> plainSink(CodecOutlet<T> codecOutlet) {
        ProducerSettings withBootstrapServers = ProducerSettings$.MODULE$.apply(system(), new ByteArraySerializer(), new ByteArraySerializer()).withBootstrapServers(bootstrapServers());
        String value = findSavepointPathForPort(codecOutlet).value();
        return Flow$.MODULE$.apply().map(obj -> {
            return new ProducerRecord(value, this.keyBytes((String) codecOutlet.partitioner().apply(obj)), codecOutlet.codec().encode(obj));
        }).via(handleTermination()).to(Producer$.MODULE$.plainSink(withBootstrapServers)).mapMaterializedValue(notUsed -> {
            return NotUsed$.MODULE$;
        });
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public <T> WritableSinkRef<T> sinkRef(CodecOutlet<T> codecOutlet) {
        return new KafkaSinkRef(system(), codecOutlet, bootstrapServers(), findSavepointPathForPort(codecOutlet).value(), killSwitch(), completionPromise());
    }

    private byte[] keyBytes(String str) {
        if (str != null) {
            return str.getBytes("UTF8");
        }
        return null;
    }

    private AtomicReference<Vector<Function0<Future<Dun>>>> stoppers() {
        return this.stoppers;
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public void onStop(Function0<Future<Dun>> function0) {
        stoppers().getAndUpdate(vector -> {
            return (Vector) vector.$colon$plus(function0, Vector$.MODULE$.canBuildFrom());
        });
    }

    private String streamletDefinitionMsg() {
        return new StringBuilder(3).append(streamletDefinition().streamletRef()).append(" (").append(streamletDefinition().streamletClass()).append(")").toString();
    }

    private <T> Flow<T, T, NotUsed> handleTermination() {
        return Flow$.MODULE$.apply().via(killSwitch().flow()).alsoTo(Sink$.MODULE$.onComplete(r4 -> {
            $anonfun$handleTermination$1(this, r4);
            return BoxedUnit.UNIT;
        }));
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public boolean signalReady() {
        return cloudflow$akkastream$AkkaStreamletContextImpl$$readyPromise().trySuccess(Dun$.MODULE$);
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public Future<Dun> stop() {
        Files.deleteIfExists(Paths.get(new StringBuilder(9).append("/tmp/").append(streamletRef()).append(".txt").toString(), new String[0]));
        killSwitch().shutdown();
        return Future$.MODULE$.sequence((TraversableOnce) stoppers().get().map(function0 -> {
            return ((Future) function0.apply()).recover(new AkkaStreamletContextImpl$$anonfun$$nestedInanonfun$stop$1$1(this), this.system().dispatcher());
        }, Vector$.MODULE$.canBuildFrom()), Vector$.MODULE$.canBuildFrom(), system().dispatcher()).flatMap(vector -> {
            this.completionPromise().trySuccess(Dun$.MODULE$);
            return this.cloudflow$akkastream$AkkaStreamletContextImpl$$completionFuture();
        }, system().dispatcher());
    }

    @Override // cloudflow.akkastream.AkkaStreamletContext
    public Map<String, String> metricTags() {
        return Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("app-id"), streamletDefinition().appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("app-version"), streamletDefinition().appVersion()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("streamlet-ref"), streamletRef())}));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5, types: [cloudflow.akkastream.AkkaStreamletContextImpl] */
    private final void MountedPathUnavailableException$lzycompute$1() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.MountedPathUnavailableException$module == null) {
                r0 = this;
                r0.MountedPathUnavailableException$module = new StreamletContext$MountedPathUnavailableException$(this);
            }
        }
    }

    public static final /* synthetic */ void $anonfun$handleTermination$1(AkkaStreamletContextImpl akkaStreamletContextImpl, Try r7) {
        if (r7 instanceof Success) {
            akkaStreamletContextImpl.system().log().error(new StringBuilder(47).append("Stream has completed. Shutting down streamlet ").append(akkaStreamletContextImpl.streamletDefinitionMsg()).append(".").toString());
            akkaStreamletContextImpl.completionPromise().trySuccess(Dun$.MODULE$);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r7 instanceof Failure)) {
                throw new MatchError(r7);
            }
            Throwable exception = ((Failure) r7).exception();
            akkaStreamletContextImpl.system().log().error(exception, new StringBuilder(44).append("Stream has failed. Shutting down streamlet ").append(akkaStreamletContextImpl.streamletDefinitionMsg()).append(".").toString());
            akkaStreamletContextImpl.completionPromise().tryFailure(exception);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public AkkaStreamletContextImpl(StreamletDefinition streamletDefinition, ActorSystem actorSystem) {
        this.streamletDefinition = streamletDefinition;
        StreamletContext.$init$(this);
        this.system = actorSystem;
        this.cloudflow$akkastream$AkkaStreamletContextImpl$$readyPromise = Promise$.MODULE$.apply();
        this.completionPromise = Promise$.MODULE$.apply();
        this.cloudflow$akkastream$AkkaStreamletContextImpl$$completionFuture = completionPromise().future();
        this.killSwitch = KillSwitches$.MODULE$.shared(streamletRef());
        this.streamletExecution = new StreamletExecution(this) { // from class: cloudflow.akkastream.AkkaStreamletContextImpl$$anon$1
            private final Future<Dun> readyFuture;
            private final /* synthetic */ AkkaStreamletContextImpl $outer;

            private Future<Dun> readyFuture() {
                return this.readyFuture;
            }

            public Future<Dun> completed() {
                return this.$outer.cloudflow$akkastream$AkkaStreamletContextImpl$$completionFuture();
            }

            public Future<Dun> ready() {
                return readyFuture();
            }

            public Future<Dun> stop() {
                return this.$outer.stop();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.readyFuture = this.cloudflow$akkastream$AkkaStreamletContextImpl$$readyPromise().future();
            }
        };
        this.bootstrapServers = system().settings().config().getString("cloudflow.kafka.bootstrap-servers");
        this.stoppers = new AtomicReference<>(package$.MODULE$.Vector().empty());
    }
}
