package org.apache.hugegraph.computer.k8s.operator;

import com.sun.net.httpserver.HttpServer;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Namespace;
import io.fabric8.kubernetes.api.model.NamespaceBuilder;
import io.fabric8.kubernetes.api.model.NamespaceList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.utils.Utils;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.apache.commons.configuration2.MapConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.computer.k8s.Constants;
import org.apache.hugegraph.computer.k8s.operator.common.AbstractController;
import org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions;
import org.apache.hugegraph.computer.k8s.operator.controller.ComputerJobController;
import org.apache.hugegraph.computer.k8s.util.KubeUtil;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.OptionSpace;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
import org.apache.logging.log4j.LogManager;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.class */
public class OperatorEntrypoint {
    private static final Logger LOG = Log.logger((Class<?>) OperatorEntrypoint.class);
    private final HugeConfig config = configFromSysPropsOrEnvVars();
    private final List<AbstractController<?>> controllers = new ArrayList();
    private NamespacedKubernetesClient kubeClient;
    private SharedInformerFactory informerFactory;
    private ExecutorService controllerPool;
    private HttpServer httpServer;

    public static void main(String[] strArr) {
        OperatorEntrypoint operatorEntrypoint = new OperatorEntrypoint();
        Runtime runtime = Runtime.getRuntime();
        Objects.requireNonNull(operatorEntrypoint);
        runtime.addShutdownHook(new Thread(operatorEntrypoint::shutdown));
        operatorEntrypoint.start();
    }

    public OperatorEntrypoint() {
        TimeZone.setDefault(TimeZone.getTimeZone((String) this.config.get(OperatorOptions.TIMEZONE)));
    }

    public void start() {
        try {
            try {
                this.kubeClient = new DefaultKubernetesClient();
                String str = (String) this.config.get(OperatorOptions.WATCH_NAMESPACE);
                if (!Objects.equals(str, "*")) {
                    createNamespace(str);
                    this.kubeClient = (NamespacedKubernetesClient) this.kubeClient.inNamespace(str);
                }
                this.informerFactory = this.kubeClient.informers();
                LOG.info("Watch namespace: " + str);
                addHealthCheck();
                registerControllers();
                this.informerFactory.startAllRegisteredInformers();
                this.informerFactory.addSharedInformerEventListener(exc -> {
                    LOG.error("Informer event listener exception occurred", (Throwable) exc);
                    shutdown();
                });
                this.controllerPool = ExecutorUtil.newFixedThreadPool(this.controllers.size(), "controllers-%d");
                CountDownLatch countDownLatch = new CountDownLatch(this.controllers.size());
                ArrayList arrayList = new ArrayList();
                for (AbstractController<?> abstractController : this.controllers) {
                    arrayList.add(CompletableFuture.runAsync(() -> {
                        abstractController.run(countDownLatch);
                    }, this.controllerPool));
                }
                CompletableFuture.runAsync(() -> {
                    try {
                        countDownLatch.await();
                        addReadyCheck();
                        LOG.info("The Operator has been ready");
                    } catch (Throwable th) {
                        LOG.error("Failed to set up ready check");
                        shutdown();
                    }
                });
                CompletableFuture.anyOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get();
                shutdown();
            } catch (Throwable th) {
                LOG.error("Failed to start Operator: ", th);
                shutdown();
            }
        } catch (Throwable th2) {
            shutdown();
            throw th2;
        }
    }

    public synchronized void shutdown() {
        LOG.info("The Operator shutdown...");
        Iterator<AbstractController<?>> it = this.controllers.iterator();
        while (it.hasNext()) {
            AbstractController<?> next = it.next();
            if (next != null) {
                next.shutdown();
            }
            it.remove();
        }
        if (this.controllerPool != null) {
            this.controllerPool.shutdown();
            this.controllerPool = null;
        }
        if (this.informerFactory != null) {
            this.informerFactory.stopAllRegisteredInformers();
            this.informerFactory = null;
        }
        if (this.kubeClient != null) {
            this.kubeClient.close();
            this.kubeClient = null;
        }
        if (this.httpServer != null) {
            this.httpServer.stop(0);
            this.httpServer = null;
        }
        LogManager.shutdown();
    }

    private HugeConfig configFromSysPropsOrEnvVars() {
        HashMap hashMap = new HashMap();
        for (String str : OperatorOptions.instance().options().keySet()) {
            String systemPropertyOrEnvVar = Utils.getSystemPropertyOrEnvVar(str.substring(Constants.K8S_SPEC_PREFIX.length()).toUpperCase(Locale.ROOT));
            if (StringUtils.isNotBlank(systemPropertyOrEnvVar)) {
                hashMap.put(str, systemPropertyOrEnvVar);
            }
        }
        return new HugeConfig(new MapConfiguration(hashMap));
    }

    private void registerControllers() {
        registerController(new ComputerJobController(this.config, this.kubeClient), ConfigMap.class, Job.class, Pod.class);
    }

    @SafeVarargs
    private final void registerController(AbstractController<?> abstractController, Class<? extends HasMetadata>... clsArr) {
        abstractController.register(this.informerFactory, clsArr);
        this.controllers.add(abstractController);
    }

    private void addHealthCheck() throws IOException {
        this.httpServer = HttpServer.create(new InetSocketAddress(((Integer) this.config.get(OperatorOptions.PROBE_PORT)).intValue()), ((Integer) this.config.get(OperatorOptions.PROBE_BACKLOG)).intValue());
        this.httpServer.createContext("/health", httpExchange -> {
            byte[] bytes = "ALL GOOD!".getBytes(StandardCharsets.UTF_8);
            httpExchange.sendResponseHeaders(200, bytes.length);
            OutputStream responseBody = httpExchange.getResponseBody();
            responseBody.write(bytes);
            responseBody.close();
        });
        this.httpServer.start();
    }

    private void addReadyCheck() {
        this.httpServer.createContext("/ready", httpExchange -> {
            byte[] bytes = "ALL Ready!".getBytes(StandardCharsets.UTF_8);
            httpExchange.sendResponseHeaders(200, bytes.length);
            OutputStream responseBody = httpExchange.getResponseBody();
            responseBody.write(bytes);
            responseBody.close();
        });
    }

    private void createNamespace(String str) {
        NamespaceBuilder namespaceBuilder = (NamespaceBuilder) new NamespaceBuilder().withNewMetadata().withName(str).endMetadata();
        KubeUtil.ignoreExists(() -> {
            return (Namespace) this.kubeClient.namespaces().create((NonNamespaceOperation<Namespace, NamespaceList, Resource<Namespace>>) namespaceBuilder.build());
        });
    }

    static {
        OptionSpace.register("computer-k8s-operator", "org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions");
    }
}
