package cloudflow.operator;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.datap.crd.App;
import akka.datap.crd.App$;
import akka.kube.actions.Action;
import akka.kube.actions.ActionExecutor;
import akka.kube.actions.Fabric8ActionExecutor;
import akka.stream.ActorAttributes$;
import akka.stream.Attributes;
import akka.stream.Attributes$;
import akka.stream.Attributes$LogLevels$;
import akka.stream.Materializer;
import akka.stream.Materializer$;
import akka.stream.OverflowStrategy$;
import akka.stream.Supervision;
import akka.stream.Supervision$Stop$;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.RunnableGraph;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import cloudflow.operator.action.CloudflowLabels$;
import cloudflow.operator.action.TopicActions$KafkaAdmins$;
import cloudflow.operator.action.runner.Runner;
import cloudflow.operator.event.AppChangeEvent;
import cloudflow.operator.event.StatusChangeEvent$;
import cloudflow.operator.event.StreamletChangeEventFlow$;
import cloudflow.operator.event.WatchEvent;
import cloudflow.operator.flow.AppEventFlow$;
import cloudflow.operator.flow.StatusChangeEventFlow$;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ConfigMapFluentImpl;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.SecretList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.Gettable;
import io.fabric8.kubernetes.client.dsl.Nameable;
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.EventType;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutorService;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.LazyRef;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: Operator.scala */
/* loaded from: input_file:cloudflow/operator/Operator$.class */
public final class Operator$ {
    private static Logger log;
    private static ExecutionContextExecutorService fabric8ExecutionContext;
    private static volatile byte bitmap$0;
    public static final Operator$ MODULE$ = new Operator$();
    private static final String ProtocolVersion = "6";
    private static final String ProtocolVersionKey = "protocol-version";
    private static final String ProtocolVersionConfigMapName = "cloudflow-protocol-version";
    private static final String AppIdLabel = "com.lightbend.cloudflow/app-id";
    private static final String ConfigFormatLabel = "com.lightbend.cloudflow/config-format";
    private static final String StreamletNameLabel = "com.lightbend.cloudflow/streamlet-name";
    private static final String ConfigUpdateLabel = "com.lightbend.cloudflow/config-update";
    private static final Map<String, String> DefaultWatchOptions = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(CloudflowLabels$.MODULE$.ManagedBy()), CloudflowLabels$.MODULE$.ManagedByCloudflow())}));
    private static final Function1<Throwable, Supervision.Directive> decider = th -> {
        return Supervision$Stop$.MODULE$;
    };
    private static final Attributes StreamAttributes = ActorAttributes$.MODULE$.supervisionStrategy(MODULE$.decider());
    private static final AtomicReference<SharedIndexInformer<App.Cr>> crInformer = new AtomicReference<>();
    private static final AtomicReference<SharedIndexInformer<Secret>> secretInformer = new AtomicReference<>();
    private static final AtomicReference<SharedIndexInformer<Pod>> podInformer = new AtomicReference<>();

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v11, types: [byte] */
    private Logger log$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (bitmap$0 & 1)) == 0) {
                log = LoggerFactory.getLogger("Operator");
                r0 = (byte) (bitmap$0 | 1);
                bitmap$0 = r0;
            }
        }
        return log;
    }

    public Logger log() {
        return ((byte) (bitmap$0 & 1)) == 0 ? log$lzycompute() : log;
    }

    public String ProtocolVersion() {
        return ProtocolVersion;
    }

    public String ProtocolVersionKey() {
        return ProtocolVersionKey;
    }

    public String ProtocolVersionConfigMapName() {
        return ProtocolVersionConfigMapName;
    }

    public ConfigMap ProtocolVersionConfigMap(List<OwnerReference> list) {
        return ((ConfigMapFluentImpl) new ConfigMapBuilder().withNewMetadata().withName(ProtocolVersionConfigMapName()).withLabels(CollectionConverters$.MODULE$.MapHasAsJava((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ProtocolVersionConfigMapName()), ProtocolVersionConfigMapName())}))).asJava()).withOwnerReferences((OwnerReference[]) list.toArray(ClassTag$.MODULE$.apply(OwnerReference.class))).endMetadata()).withData(CollectionConverters$.MODULE$.MapHasAsJava((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(ProtocolVersionKey()), ProtocolVersion())}))).asJava()).build();
    }

    public String AppIdLabel() {
        return AppIdLabel;
    }

    public String ConfigFormatLabel() {
        return ConfigFormatLabel;
    }

    public String StreamletNameLabel() {
        return StreamletNameLabel;
    }

    public String ConfigUpdateLabel() {
        return ConfigUpdateLabel;
    }

    public Map<String, String> DefaultWatchOptions() {
        return DefaultWatchOptions;
    }

    public Function1<Throwable, Supervision.Directive> decider() {
        return decider;
    }

    public Attributes StreamAttributes() {
        return StreamAttributes;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v11, types: [byte] */
    private ExecutionContextExecutorService fabric8ExecutionContext$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (bitmap$0 & 2)) == 0) {
                fabric8ExecutionContext = ExecutionContext$.MODULE$.fromExecutorService(Executors.newSingleThreadExecutor());
                r0 = (byte) (bitmap$0 | 2);
                bitmap$0 = r0;
            }
        }
        return fabric8ExecutionContext;
    }

    private ExecutionContextExecutorService fabric8ExecutionContext() {
        return ((byte) (bitmap$0 & 2)) == 0 ? fabric8ExecutionContext$lzycompute() : fabric8ExecutionContext;
    }

    public void handleEvents(KubernetesClient kubernetesClient, Map<String, Runner<?>> map, String str, String str2, ActorSystem actorSystem, Materializer materializer, ExecutionContext executionContext) {
        Attributes logLevels = Attributes$.MODULE$.logLevels(Attributes$LogLevels$.MODULE$.Info(), Attributes$.MODULE$.logLevels$default$2(), Attributes$.MODULE$.logLevels$default$3());
        Fabric8ActionExecutor fabric8ActionExecutor = new Fabric8ActionExecutor(kubernetesClient, fabric8ExecutionContext());
        SharedInformerFactory informers = kubernetesClient.informers();
        runStream(watchCr(informers, DefaultWatchOptions(), actorSystem).via(AppEventFlow$.MODULE$.fromWatchEvent(logLevels)).via(AppEventFlow$.MODULE$.toAction(map, str, str2)).via(executeActions(fabric8ActionExecutor, logLevels)).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.right()).mapMaterializedValue(future -> {
            return future.flatMap(done -> {
                return TopicActions$KafkaAdmins$.MODULE$.close(new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds(), executionContext).map(boxedUnit -> {
                    return done;
                }, executionContext);
            }, executionContext);
        }), "The actions stream completed unexpectedly, terminating.", "The actions stream failed, terminating.", actorSystem, materializer, executionContext);
        runStream(watchSecret(informers, (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(CloudflowLabels$.MODULE$.ManagedBy()), CloudflowLabels$.MODULE$.ManagedByCloudflow()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(CloudflowLabels$.MODULE$.ConfigFormat()), CloudflowLabels$.MODULE$.StreamletDeploymentConfigFormat())})), actorSystem).via(StreamletChangeEventFlow$.MODULE$.fromWatchEvent()).via(mapToAppInSameNamespace(kubernetesClient, executionContext)).via(StreamletChangeEventFlow$.MODULE$.toConfigUpdateAction(map, str)).via(executeActions(fabric8ActionExecutor, logLevels)).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.right()), "The config updates stream completed unexpectedly, terminating.", "The config updates stream failed, terminating.", actorSystem, materializer, executionContext);
        Source via = watchPod(informers, DefaultWatchOptions(), actorSystem).via(StatusChangeEventFlow$.MODULE$.fromWatchEvent());
        Function1 function1 = statusChangeEvent -> {
            return StatusChangeEvent$.MODULE$.detected(statusChangeEvent);
        };
        runStream(via.log("status-change-event", function1, via.log$default$3("status-change-event", function1)).via(mapToAppInSameNamespace(kubernetesClient, executionContext)).via(StatusChangeEventFlow$.MODULE$.toStatusUpdateAction(map)).via(executeActions(fabric8ActionExecutor, logLevels)).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.right()), "The status changes stream completed unexpectedly, terminating.", "The status changes stream failed, terminating.", actorSystem, materializer, executionContext);
        informers.startAllRegisteredInformers();
    }

    private Flow<Action, Action, NotUsed> executeActions(ActionExecutor actionExecutor, Attributes attributes) {
        return Flow$.MODULE$.apply().mapAsync(1, action -> {
            return actionExecutor.execute(action);
        }).withAttributes(attributes);
    }

    public <E extends AppChangeEvent<?>> Flow<E, Tuple2<Option<App.Cr>, E>, NotUsed> mapToAppInSameNamespace(KubernetesClient kubernetesClient, ExecutionContext executionContext) {
        return Flow$.MODULE$.apply().mapAsync(1, appChangeEvent -> {
            String namespace = appChangeEvent.namespace();
            return Future$.MODULE$.apply(() -> {
                return Option$.MODULE$.apply(((Gettable) ((Nameable) kubernetesClient.customResources(App$.MODULE$.customResourceDefinitionContext(), App.Cr.class, App.List.class).inNamespace(namespace)).withName(appChangeEvent.appId())).get());
            }, executionContext).map(option -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(option), appChangeEvent);
            }, executionContext);
        });
    }

    private <T extends HasMetadata> ResourceEventHandler<T> getEventHandler(final Function1<WatchEvent<T>, BoxedUnit> function1) {
        return (ResourceEventHandler<T>) new ResourceEventHandler<T>(function1) { // from class: cloudflow.operator.Operator$$anon$1
            private final Function1 fn$1;

            /* JADX WARN: Incorrect types in method signature: (TT;)V */
            public void onAdd(HasMetadata hasMetadata) {
                this.fn$1.apply(new WatchEvent(hasMetadata, EventType.ADDITION));
            }

            /* JADX WARN: Incorrect types in method signature: (TT;TT;)V */
            public void onUpdate(HasMetadata hasMetadata, HasMetadata hasMetadata2) {
                this.fn$1.apply(new WatchEvent(hasMetadata2, EventType.UPDATION));
            }

            /* JADX WARN: Incorrect types in method signature: (TT;Z)V */
            public void onDelete(HasMetadata hasMetadata, boolean z) {
                this.fn$1.apply(new WatchEvent(hasMetadata, EventType.DELETION));
            }

            {
                this.fn$1 = function1;
            }
        };
    }

    private AtomicReference<SharedIndexInformer<App.Cr>> crInformer() {
        return crInformer;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> T setOnceAndGet(AtomicReference<T> atomicReference, Function0<T> function0) {
        LazyRef lazyRef = new LazyRef();
        return atomicReference.compareAndSet(null, newValue$1(lazyRef, function0)) ? (T) newValue$1(lazyRef, function0) : (T) atomicReference.get();
    }

    private <T extends HasMetadata> void enqueueTask(String str, SourceQueueWithComplete<WatchEvent<T>> sourceQueueWithComplete, WatchEvent<T> watchEvent) {
        log().info("Enqueue {} with type [{}]", str, watchEvent.eventType());
        sourceQueueWithComplete.offer(watchEvent);
    }

    private Source<WatchEvent<App.Cr>, NotUsed> watchCr(SharedInformerFactory sharedInformerFactory, Map<String, String> map, ActorSystem actorSystem) {
        SharedIndexInformer sharedIndexInformer = (SharedIndexInformer) setOnceAndGet(crInformer(), () -> {
            return sharedInformerFactory.sharedIndexInformerForCustomResource(App$.MODULE$.customResourceDefinitionContext(), App.Cr.class, App.List.class, new OperationContext().withLabels(CollectionConverters$.MODULE$.MapHasAsJava(map).asJava()), 600000L);
        });
        Tuple2 preMaterialize = Source$.MODULE$.queue(1000, OverflowStrategy$.MODULE$.dropHead()).preMaterialize(Materializer$.MODULE$.matFromSystem(actorSystem));
        if (preMaterialize == null) {
            throw new MatchError(preMaterialize);
        }
        Tuple2 tuple2 = new Tuple2((SourceQueueWithComplete) preMaterialize._1(), (Source) preMaterialize._2());
        SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) tuple2._1();
        Source<WatchEvent<App.Cr>, NotUsed> source = (Source) tuple2._2();
        sharedIndexInformer.addEventHandler(getEventHandler(watchEvent -> {
            $anonfun$watchCr$2(sourceQueueWithComplete, watchEvent);
            return BoxedUnit.UNIT;
        }));
        return source;
    }

    private AtomicReference<SharedIndexInformer<Secret>> secretInformer() {
        return secretInformer;
    }

    private Source<WatchEvent<Secret>, NotUsed> watchSecret(SharedInformerFactory sharedInformerFactory, Map<String, String> map, ActorSystem actorSystem) {
        SharedIndexInformer sharedIndexInformer = (SharedIndexInformer) setOnceAndGet(secretInformer(), () -> {
            return sharedInformerFactory.sharedIndexInformerFor(Secret.class, SecretList.class, new OperationContext().withLabels(CollectionConverters$.MODULE$.MapHasAsJava(map).asJava()), 600000L);
        });
        Tuple2 preMaterialize = Source$.MODULE$.queue(1000, OverflowStrategy$.MODULE$.dropHead()).preMaterialize(Materializer$.MODULE$.matFromSystem(actorSystem));
        if (preMaterialize == null) {
            throw new MatchError(preMaterialize);
        }
        Tuple2 tuple2 = new Tuple2((SourceQueueWithComplete) preMaterialize._1(), (Source) preMaterialize._2());
        SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) tuple2._1();
        Source<WatchEvent<Secret>, NotUsed> source = (Source) tuple2._2();
        sharedIndexInformer.addEventHandler(getEventHandler(watchEvent -> {
            $anonfun$watchSecret$2(sourceQueueWithComplete, watchEvent);
            return BoxedUnit.UNIT;
        }));
        return source;
    }

    private AtomicReference<SharedIndexInformer<Pod>> podInformer() {
        return podInformer;
    }

    private Source<WatchEvent<Pod>, NotUsed> watchPod(SharedInformerFactory sharedInformerFactory, Map<String, String> map, ActorSystem actorSystem) {
        SharedIndexInformer sharedIndexInformer = (SharedIndexInformer) setOnceAndGet(podInformer(), () -> {
            return sharedInformerFactory.sharedIndexInformerFor(Pod.class, PodList.class, new OperationContext().withLabels(CollectionConverters$.MODULE$.MapHasAsJava(map).asJava()), 600000L);
        });
        Tuple2 preMaterialize = Source$.MODULE$.queue(1000, OverflowStrategy$.MODULE$.dropHead()).preMaterialize(Materializer$.MODULE$.matFromSystem(actorSystem));
        if (preMaterialize == null) {
            throw new MatchError(preMaterialize);
        }
        Tuple2 tuple2 = new Tuple2((SourceQueueWithComplete) preMaterialize._1(), (Source) preMaterialize._2());
        SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) tuple2._1();
        Source<WatchEvent<Pod>, NotUsed> source = (Source) tuple2._2();
        sharedIndexInformer.addEventHandler(getEventHandler(watchEvent -> {
            $anonfun$watchPod$2(sourceQueueWithComplete, watchEvent);
            return BoxedUnit.UNIT;
        }));
        return source;
    }

    private void runStream(RunnableGraph<Future<?>> runnableGraph, String str, String str2, ActorSystem actorSystem, Materializer materializer, ExecutionContext executionContext) {
        ((Future) runnableGraph.withAttributes(StreamAttributes()).run(materializer)).onComplete(r7 -> {
            Future terminate;
            if (r7 instanceof Success) {
                MODULE$.log().warn(str);
                actorSystem.registerOnTermination(() -> {
                    MODULE$.exitWithFailure();
                });
                terminate = actorSystem.terminate();
            } else {
                if (!(r7 instanceof Failure)) {
                    throw new MatchError(r7);
                }
                MODULE$.log().error(str2, ((Failure) r7).exception());
                actorSystem.registerOnTermination(() -> {
                    MODULE$.exitWithFailure();
                });
                terminate = actorSystem.terminate();
            }
            return terminate;
        }, executionContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void exitWithFailure() {
        System.exit(-1);
    }

    private static final /* synthetic */ Object newValue$lzycompute$1(LazyRef lazyRef, Function0 function0) {
        Object value;
        synchronized (lazyRef) {
            value = lazyRef.initialized() ? lazyRef.value() : lazyRef.initialize(function0.apply());
        }
        return value;
    }

    private static final Object newValue$1(LazyRef lazyRef, Function0 function0) {
        return lazyRef.initialized() ? lazyRef.value() : newValue$lzycompute$1(lazyRef, function0);
    }

    public static final /* synthetic */ void $anonfun$watchCr$2(SourceQueueWithComplete sourceQueueWithComplete, WatchEvent watchEvent) {
        MODULE$.enqueueTask("App.Cr Watch Event", sourceQueueWithComplete, watchEvent);
    }

    public static final /* synthetic */ void $anonfun$watchSecret$2(SourceQueueWithComplete sourceQueueWithComplete, WatchEvent watchEvent) {
        MODULE$.enqueueTask("Secret Watch Event", sourceQueueWithComplete, watchEvent);
    }

    public static final /* synthetic */ void $anonfun$watchPod$2(SourceQueueWithComplete sourceQueueWithComplete, WatchEvent watchEvent) {
        MODULE$.enqueueTask("Pod Watch Event", sourceQueueWithComplete, watchEvent);
    }

    private Operator$() {
    }
}
