/*
 * Decompiled with CFR 0.152.
 */
package cloudflow.operator;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.actor.ClassicActorSystemProvider;
import akka.datap.crd.App;
import akka.datap.crd.App$;
import akka.event.LoggingAdapter;
import akka.kube.actions.Action;
import akka.kube.actions.ActionExecutor;
import akka.kube.actions.Fabric8ActionExecutor;
import akka.stream.ActorAttributes$;
import akka.stream.Attributes;
import akka.stream.Attributes$;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.Materializer$;
import akka.stream.OverflowStrategy$;
import akka.stream.Supervision;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.RunnableGraph;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import cloudflow.operator.action.CloudflowLabels$;
import cloudflow.operator.action.TopicActions$KafkaAdmins$;
import cloudflow.operator.action.runner.Runner;
import cloudflow.operator.event.AppChangeEvent;
import cloudflow.operator.event.StatusChangeEvent;
import cloudflow.operator.event.StatusChangeEvent$;
import cloudflow.operator.event.StreamletChangeEventFlow$;
import cloudflow.operator.event.WatchEvent;
import cloudflow.operator.flow.AppEventFlow$;
import cloudflow.operator.flow.StatusChangeEventFlow$;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.SecretBuilder;
import io.fabric8.kubernetes.api.model.SecretFluent;
import io.fabric8.kubernetes.api.model.SecretFluentImpl;
import io.fabric8.kubernetes.api.model.SecretList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.Gettable;
import io.fabric8.kubernetes.client.dsl.Nameable;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.EventType;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import java.io.Serializable;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutorService;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.LazyRef;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

public final class Operator$ {
    public static final Operator$ MODULE$ = new Operator$();
    private static Logger log;
    private static ExecutionContextExecutorService fabric8ExecutionContext;
    private static final String AppIdLabel;
    private static final String ConfigFormatLabel;
    private static final String StreamletNameLabel;
    private static final String ConfigUpdateLabel;
    private static final scala.collection.immutable.Map<String, String> DefaultWatchOptions;
    private static final Function1<Throwable, Supervision.Directive> decider;
    private static final Attributes StreamAttributes;
    private static final AtomicReference<SharedIndexInformer<App.Cr>> crInformer;
    private static final AtomicReference<SharedIndexInformer<Secret>> secretInformer;
    private static final AtomicReference<SharedIndexInformer<Pod>> podInformer;
    private static volatile byte bitmap$0;

    static {
        AppIdLabel = "com.lightbend.cloudflow/app-id";
        ConfigFormatLabel = "com.lightbend.cloudflow/config-format";
        StreamletNameLabel = "com.lightbend.cloudflow/streamlet-name";
        ConfigUpdateLabel = "com.lightbend.cloudflow/config-update";
        DefaultWatchOptions = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)CloudflowLabels$.MODULE$.ManagedBy()), (Object)CloudflowLabels$.MODULE$.ManagedByCloudflow())}));
        decider = (Function1 & Serializable)x0$1 -> {
            Throwable throwable = x0$1;
            Supervision.Stop$ stop$ = Supervision.Stop$.MODULE$;
            return stop$;
        };
        StreamAttributes = ActorAttributes$.MODULE$.supervisionStrategy(MODULE$.decider());
        crInformer = new AtomicReference();
        secretInformer = new AtomicReference();
        podInformer = new AtomicReference();
    }

    private Logger log$lzycompute() {
        Operator$ operator$ = this;
        synchronized (operator$) {
            if ((byte)(bitmap$0 & 1) == 0) {
                log = LoggerFactory.getLogger((String)"Operator");
                bitmap$0 = (byte)(bitmap$0 | 1);
            }
        }
        return log;
    }

    public Logger log() {
        return (byte)(bitmap$0 & 1) == 0 ? this.log$lzycompute() : log;
    }

    public String AppIdLabel() {
        return AppIdLabel;
    }

    public String ConfigFormatLabel() {
        return ConfigFormatLabel;
    }

    public String StreamletNameLabel() {
        return StreamletNameLabel;
    }

    public String ConfigUpdateLabel() {
        return ConfigUpdateLabel;
    }

    public scala.collection.immutable.Map<String, String> DefaultWatchOptions() {
        return DefaultWatchOptions;
    }

    public Function1<Throwable, Supervision.Directive> decider() {
        return decider;
    }

    public Attributes StreamAttributes() {
        return StreamAttributes;
    }

    private ExecutionContextExecutorService fabric8ExecutionContext$lzycompute() {
        Operator$ operator$ = this;
        synchronized (operator$) {
            if ((byte)(bitmap$0 & 2) == 0) {
                fabric8ExecutionContext = ExecutionContext$.MODULE$.fromExecutorService(Executors.newSingleThreadExecutor());
                bitmap$0 = (byte)(bitmap$0 | 2);
            }
        }
        return fabric8ExecutionContext;
    }

    private ExecutionContextExecutorService fabric8ExecutionContext() {
        return (byte)(bitmap$0 & 2) == 0 ? this.fabric8ExecutionContext$lzycompute() : fabric8ExecutionContext;
    }

    public Secret ProtocolVersionSecret(List<OwnerReference> ownerReferences) {
        return ((SecretBuilder)((SecretFluentImpl)((SecretFluent.MetadataNested)new SecretBuilder().withNewMetadata().withName("cloudflow-protocol-version").withLabels(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"cloudflow-protocol-version"), (Object)"cloudflow-protocol-version")}))).asJava()).withOwnerReferences((OwnerReference[])ownerReferences.toArray(ClassTag$.MODULE$.apply(OwnerReference.class)))).endMetadata()).withStringData(CollectionConverters$.MODULE$.MapHasAsJava((Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"protocol-version"), (Object)"7")}))).asJava())).build();
    }

    public void handleEvents(KubernetesClient client, scala.collection.immutable.Map<String, Runner<?>> runners, String podName, String podNamespace, ActorSystem system, Materializer mat, ExecutionContext ec) {
        Attributes logAttributes = Attributes$.MODULE$.logLevels(Attributes.LogLevels$.MODULE$.Info(), Attributes$.MODULE$.logLevels$default$2(), Attributes$.MODULE$.logLevels$default$3());
        Fabric8ActionExecutor actionExecutor = new Fabric8ActionExecutor(client, (ExecutionContext)this.fabric8ExecutionContext());
        SharedInformerFactory sharedInformerFactory = client.informers();
        this.runStream(this.watchCr(sharedInformerFactory, this.DefaultWatchOptions(), system).via(AppEventFlow$.MODULE$.fromWatchEvent(logAttributes)).via(AppEventFlow$.MODULE$.toAction(runners, podName, podNamespace)).via(this.executeActions((ActionExecutor)actionExecutor, logAttributes)).toMat((Graph)Sink$.MODULE$.ignore(), Keep$.MODULE$.right()).mapMaterializedValue((Function1 & Serializable)x$1 -> x$1.flatMap((Function1 & Serializable)value -> TopicActions$KafkaAdmins$.MODULE$.close((Duration)new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds(), ec).map((Function1 & Serializable)x$2 -> value, ec), ec)), "The actions stream completed unexpectedly, terminating.", "The actions stream failed, terminating.", system, mat, ec);
        scala.collection.immutable.Map watchOptions = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)CloudflowLabels$.MODULE$.ManagedBy()), (Object)CloudflowLabels$.MODULE$.ManagedByCloudflow()), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)CloudflowLabels$.MODULE$.ConfigFormat()), (Object)CloudflowLabels$.MODULE$.StreamletDeploymentConfigFormat())}));
        this.runStream(this.watchSecret(sharedInformerFactory, (scala.collection.immutable.Map<String, String>)watchOptions, system).via(StreamletChangeEventFlow$.MODULE$.fromWatchEvent()).via(this.mapToAppInSameNamespace(client, ec)).via(StreamletChangeEventFlow$.MODULE$.toConfigUpdateAction(runners, podName)).via(this.executeActions((ActionExecutor)actionExecutor, logAttributes)).toMat((Graph)Sink$.MODULE$.ignore(), Keep$.MODULE$.right()), "The config updates stream completed unexpectedly, terminating.", "The config updates stream failed, terminating.", system, mat, ec);
        Source qual$1 = this.watchPod(sharedInformerFactory, this.DefaultWatchOptions(), system).via(StatusChangeEventFlow$.MODULE$.fromWatchEvent());
        String x$12 = "status-change-event";
        Function1 & Serializable x$2 = (Function1 & Serializable)event -> StatusChangeEvent$.MODULE$.detected((StatusChangeEvent)event);
        LoggingAdapter x$3 = qual$1.log$default$3("status-change-event", (Function1)x$2);
        this.runStream(((Source)qual$1.log("status-change-event", (Function1)x$2, x$3)).via(this.mapToAppInSameNamespace(client, ec)).via(StatusChangeEventFlow$.MODULE$.toStatusUpdateAction(runners)).via(this.executeActions((ActionExecutor)actionExecutor, logAttributes)).toMat((Graph)Sink$.MODULE$.ignore(), Keep$.MODULE$.right()), "The status changes stream completed unexpectedly, terminating.", "The status changes stream failed, terminating.", system, mat, ec);
        sharedInformerFactory.startAllRegisteredInformers();
    }

    private Flow<Action, Action, NotUsed> executeActions(ActionExecutor actionExecutor, Attributes logAttributes) {
        return ((Flow)Flow$.MODULE$.apply().mapAsync(1, (Function1 & Serializable)action -> actionExecutor.execute(action))).withAttributes(logAttributes);
    }

    public <E extends AppChangeEvent<?>> Flow<E, Tuple2<Option<App.Cr>, E>, NotUsed> mapToAppInSameNamespace(KubernetesClient client, ExecutionContext ec) {
        return (Flow)Flow$.MODULE$.apply().mapAsync(1, (Function1 & Serializable)changeEvent -> {
            String ns = changeEvent.namespace();
            return Future$.MODULE$.apply((Function0 & Serializable)() -> Option$.MODULE$.apply(((Gettable)((Nameable)client.customResources(App$.MODULE$.customResourceDefinitionContext(), App.Cr.class, App.List.class).inNamespace(ns)).withName(changeEvent.appId())).get()), ec).map((Function1 & Serializable)cr -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(cr), changeEvent), ec);
        });
    }

    private <T extends HasMetadata> ResourceEventHandler<T> getEventHandler(Function1<WatchEvent<T>, BoxedUnit> fn) {
        return new ResourceEventHandler<T>(fn){
            private final Function1 fn$1;

            public void onAdd(T elem) {
                this.fn$1.apply(new WatchEvent<T>(elem, EventType.ADDITION));
            }

            public void onUpdate(T oldElem, T newElem) {
                this.fn$1.apply(new WatchEvent<T>(newElem, EventType.UPDATION));
            }

            public void onDelete(T elem, boolean deletedFinalStateUnknown) {
                this.fn$1.apply(new WatchEvent<T>(elem, EventType.DELETION));
            }
            {
                this.fn$1 = fn$1;
            }
        };
    }

    private AtomicReference<SharedIndexInformer<App.Cr>> crInformer() {
        return crInformer;
    }

    private <T> T setOnceAndGet(AtomicReference<T> ar, Function0<T> elem) {
        LazyRef newValue$lzy = new LazyRef();
        return (T)(ar.compareAndSet(null, Operator$.newValue$1(newValue$lzy, elem)) ? Operator$.newValue$1(newValue$lzy, elem) : ar.get());
    }

    private <T extends HasMetadata> void enqueueTask(String msgType, SourceQueueWithComplete<WatchEvent<T>> sourceMat, WatchEvent<T> event) {
        this.log().info("Enqueue {} with type [{}]", (Object)msgType, (Object)event.eventType());
        sourceMat.offer(event);
    }

    private Source<WatchEvent<App.Cr>, NotUsed> watchCr(SharedInformerFactory sharedInformerFactory, scala.collection.immutable.Map<String, String> options, ActorSystem system) {
        SharedIndexInformer<App.Cr> informer = this.setOnceAndGet(this.crInformer(), (Function0 & Serializable)() -> sharedInformerFactory.sharedIndexInformerForCustomResource(App$.MODULE$.customResourceDefinitionContext(), App.Cr.class, App.List.class, new OperationContext().withLabels(CollectionConverters$.MODULE$.MapHasAsJava((Map)options).asJava()), 600000L));
        Tuple2 tuple2 = Source$.MODULE$.queue(1000, OverflowStrategy$.MODULE$.dropHead()).preMaterialize(Materializer$.MODULE$.matFromSystem((ClassicActorSystemProvider)system));
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        SourceQueueWithComplete sourceMat = (SourceQueueWithComplete)tuple2._1();
        Source source = (Source)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourceMat, (Object)source);
        Tuple2 tuple23 = tuple22;
        SourceQueueWithComplete sourceMat2 = (SourceQueueWithComplete)tuple23._1();
        Source source2 = (Source)tuple23._2();
        informer.addEventHandler(this.getEventHandler((Function1 & Serializable)event -> {
            Operator$.MODULE$.enqueueTask("App.Cr Watch Event", sourceMat2, event);
            return BoxedUnit.UNIT;
        }));
        return source2;
    }

    private AtomicReference<SharedIndexInformer<Secret>> secretInformer() {
        return secretInformer;
    }

    private Source<WatchEvent<Secret>, NotUsed> watchSecret(SharedInformerFactory sharedInformerFactory, scala.collection.immutable.Map<String, String> options, ActorSystem system) {
        SharedIndexInformer<Secret> informer = this.setOnceAndGet(this.secretInformer(), (Function0 & Serializable)() -> sharedInformerFactory.sharedIndexInformerFor(Secret.class, SecretList.class, new OperationContext().withLabels(CollectionConverters$.MODULE$.MapHasAsJava((Map)options).asJava()), 600000L));
        Tuple2 tuple2 = Source$.MODULE$.queue(1000, OverflowStrategy$.MODULE$.dropHead()).preMaterialize(Materializer$.MODULE$.matFromSystem((ClassicActorSystemProvider)system));
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        SourceQueueWithComplete sourceMat = (SourceQueueWithComplete)tuple2._1();
        Source source = (Source)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourceMat, (Object)source);
        Tuple2 tuple23 = tuple22;
        SourceQueueWithComplete sourceMat2 = (SourceQueueWithComplete)tuple23._1();
        Source source2 = (Source)tuple23._2();
        informer.addEventHandler(this.getEventHandler((Function1 & Serializable)event -> {
            Operator$.MODULE$.enqueueTask("Secret Watch Event", sourceMat2, event);
            return BoxedUnit.UNIT;
        }));
        return source2;
    }

    private AtomicReference<SharedIndexInformer<Pod>> podInformer() {
        return podInformer;
    }

    private Source<WatchEvent<Pod>, NotUsed> watchPod(SharedInformerFactory sharedInformerFactory, scala.collection.immutable.Map<String, String> options, ActorSystem system) {
        SharedIndexInformer<Pod> informer = this.setOnceAndGet(this.podInformer(), (Function0 & Serializable)() -> sharedInformerFactory.sharedIndexInformerFor(Pod.class, PodList.class, new OperationContext().withLabels(CollectionConverters$.MODULE$.MapHasAsJava((Map)options).asJava()), 600000L));
        Tuple2 tuple2 = Source$.MODULE$.queue(1000, OverflowStrategy$.MODULE$.dropHead()).preMaterialize(Materializer$.MODULE$.matFromSystem((ClassicActorSystemProvider)system));
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        SourceQueueWithComplete sourceMat = (SourceQueueWithComplete)tuple2._1();
        Source source = (Source)tuple2._2();
        Tuple2 tuple22 = new Tuple2((Object)sourceMat, (Object)source);
        Tuple2 tuple23 = tuple22;
        SourceQueueWithComplete sourceMat2 = (SourceQueueWithComplete)tuple23._1();
        Source source2 = (Source)tuple23._2();
        informer.addEventHandler(this.getEventHandler((Function1 & Serializable)event -> {
            Operator$.MODULE$.enqueueTask("Pod Watch Event", sourceMat2, event);
            return BoxedUnit.UNIT;
        }));
        return source2;
    }

    private void runStream(RunnableGraph<Future<?>> graph, String unexpectedCompletionMsg, String errorMsg, ActorSystem system, Materializer mat, ExecutionContext ec) {
        ((Future)graph.withAttributes(this.StreamAttributes()).run(mat)).onComplete((Function1 & Serializable)x0$1 -> {
            Future future;
            Try try_ = x0$1;
            if (try_ instanceof Success) {
                MODULE$.log().warn(unexpectedCompletionMsg);
                system.registerOnTermination((Function0)(JFunction0.mcV.sp & Serializable)() -> MODULE$.exitWithFailure());
                future = system.terminate();
            } else if (try_ instanceof Failure) {
                Failure failure = (Failure)try_;
                Throwable t = failure.exception();
                MODULE$.log().error(errorMsg, t);
                system.registerOnTermination((Function0)(JFunction0.mcV.sp & Serializable)() -> MODULE$.exitWithFailure());
                future = system.terminate();
            } else {
                throw new MatchError((Object)try_);
            }
            return future;
        }, ec);
    }

    private void exitWithFailure() {
        System.exit(-1);
    }

    private static final /* synthetic */ Object newValue$lzycompute$1(LazyRef newValue$lzy$1, Function0 elem$1) {
        Object object;
        LazyRef lazyRef = newValue$lzy$1;
        synchronized (lazyRef) {
            object = newValue$lzy$1.initialized() ? newValue$lzy$1.value() : newValue$lzy$1.initialize(elem$1.apply());
        }
        return object;
    }

    private static final Object newValue$1(LazyRef newValue$lzy$1, Function0 elem$1) {
        return newValue$lzy$1.initialized() ? newValue$lzy$1.value() : Operator$.newValue$lzycompute$1(newValue$lzy$1, elem$1);
    }

    private Operator$() {
    }
}

