package org.apache.hugegraph.computer.k8s.driver;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.utils.IOHelpers;
import io.fabric8.kubernetes.client.utils.URLUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.driver.ComputerDriver;
import org.apache.hugegraph.computer.driver.ComputerDriverException;
import org.apache.hugegraph.computer.driver.DefaultJobState;
import org.apache.hugegraph.computer.driver.JobObserver;
import org.apache.hugegraph.computer.driver.JobState;
import org.apache.hugegraph.computer.driver.JobStatus;
import org.apache.hugegraph.computer.driver.SuperstepStat;
import org.apache.hugegraph.computer.k8s.Constants;
import org.apache.hugegraph.computer.k8s.config.KubeDriverOptions;
import org.apache.hugegraph.computer.k8s.config.KubeSpecOptions;
import org.apache.hugegraph.computer.k8s.crd.model.ComputerJobSpec;
import org.apache.hugegraph.computer.k8s.crd.model.ComputerJobStatus;
import org.apache.hugegraph.computer.k8s.crd.model.HugeGraphComputerJob;
import org.apache.hugegraph.computer.k8s.crd.model.HugeGraphComputerJobList;
import org.apache.hugegraph.computer.k8s.util.KubeUtil;
import org.apache.hugegraph.config.ConfigListOption;
import org.apache.hugegraph.config.ConfigOption;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.TypedOption;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/k8s/driver/KubernetesDriver.class */
public class KubernetesDriver implements ComputerDriver {
    private static final Logger LOG;
    private final HugeConfig conf;
    private final String namespace;
    private final NamespacedKubernetesClient kubeClient;
    private final MixedOperation<HugeGraphComputerJob, HugeGraphComputerJobList, Resource<HugeGraphComputerJob>> operation;
    private volatile Watch watch;
    private final MutableBoolean watchActive;
    private final Map<String, Pair<CompletableFuture<Void>, JobObserver>> waits;
    private final Map<String, Object> defaultSpec;
    private final Map<String, String> defaultConf;
    private final String bashPath;
    private final String jarFileDir;
    private final String registry;
    private final String username;
    private final String password;
    private final Boolean enableInternalAlgorithm;
    private final List<String> internalAlgorithms;
    private final String internalAlgorithmImageUrl;
    private final String frameworkImageUrl;
    private static final String DEFAULT_PUSH_BASH_PATH = "/docker_push.sh";
    private static final String BUILD_IMAGE_FUNC = "build_image";
    private static final String TMP_DIR;
    static final /* synthetic */ boolean $assertionsDisabled;

    public KubernetesDriver(HugeConfig hugeConfig) {
        this(hugeConfig, createKubeClient(hugeConfig));
    }

    public KubernetesDriver(HugeConfig hugeConfig, NamespacedKubernetesClient namespacedKubernetesClient) {
        this.conf = hugeConfig;
        this.namespace = (String) this.conf.get(KubeDriverOptions.NAMESPACE);
        this.kubeClient = (NamespacedKubernetesClient) namespacedKubernetesClient.inNamespace(this.namespace);
        this.operation = this.kubeClient.customResources(HugeGraphComputerJob.class, HugeGraphComputerJobList.class);
        this.watch = initWatch();
        this.watchActive = new MutableBoolean(true);
        this.waits = new ConcurrentHashMap();
        this.defaultSpec = defaultSpec();
        this.defaultConf = defaultComputerConf();
        this.bashPath = (String) this.conf.get(KubeDriverOptions.BUILD_IMAGE_BASH_PATH);
        this.jarFileDir = (String) this.conf.get(KubeDriverOptions.JAR_FILE_DIR);
        this.registry = ((String) this.conf.get(KubeDriverOptions.IMAGE_REPOSITORY_REGISTRY)).trim();
        this.username = (String) this.conf.get(KubeDriverOptions.IMAGE_REPOSITORY_USERNAME);
        this.password = (String) this.conf.get(KubeDriverOptions.IMAGE_REPOSITORY_PASSWORD);
        this.enableInternalAlgorithm = (Boolean) this.conf.get(KubeDriverOptions.ENABLE_INTERNAL_ALGORITHM);
        this.internalAlgorithms = (List) this.conf.get(KubeDriverOptions.INTERNAL_ALGORITHMS);
        this.internalAlgorithmImageUrl = (String) this.conf.get(KubeDriverOptions.INTERNAL_ALGORITHM_IMAGE_URL);
        this.frameworkImageUrl = (String) this.conf.get(KubeDriverOptions.FRAMEWORK_IMAGE_URL);
    }

    private static NamespacedKubernetesClient createKubeClient(HugeConfig hugeConfig) {
        String str = (String) hugeConfig.get(KubeDriverOptions.KUBE_CONFIG);
        try {
            return new DefaultKubernetesClient(Config.fromKubeconfig(FileUtils.readFileToString(new File(str))));
        } catch (IOException e) {
            throw new ComputerDriverException("Failed to read KubeConfig: %s", e, str);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v78, types: [java.io.InputStream] */
    @Override // org.apache.hugegraph.computer.driver.ComputerDriver
    public void uploadAlgorithmJar(String str, InputStream inputStream) {
        File file = null;
        try {
            try {
                file = File.createTempFile("userAlgorithm", ".jar", Files.createDirectories(Paths.get(TMP_DIR, UUID.randomUUID().toString()), new FileAttribute[0]).toFile());
                FileUtils.copyInputStreamToFile(inputStream, file);
                String readFully = IOHelpers.readFully(StringUtils.isBlank(this.bashPath) ? getClass().getResourceAsStream(DEFAULT_PUSH_BASH_PATH) : new FileInputStream(this.bashPath));
                StringBuilder sb = new StringBuilder();
                sb.append(BUILD_IMAGE_FUNC);
                if (StringUtils.isNotBlank(this.registry)) {
                    sb.append(" -r ").append(this.registry);
                }
                if (StringUtils.isNotBlank(this.username)) {
                    sb.append(" -u ").append(this.username);
                }
                if (StringUtils.isNotBlank(this.password)) {
                    sb.append(" -p ").append(this.password);
                }
                sb.append(" -s ").append(file.getAbsolutePath());
                sb.append(" -j ").append(buildJarFile(this.jarFileDir, str));
                sb.append(" -i ").append(buildImageUrl(str));
                sb.append(" -f ").append(this.frameworkImageUrl);
                Process exec = Runtime.getRuntime().exec(new String[]{"bash", "-c", readFully + "\n" + sb.toString()});
                if (exec.waitFor() == 0) {
                    FileUtils.deleteQuietly(file);
                    return;
                }
                String readFully2 = IOHelpers.readFully(exec.getErrorStream());
                if (StringUtils.isBlank(readFully2)) {
                    readFully2 = IOHelpers.readFully(exec.getInputStream());
                }
                throw new ComputerDriverException(readFully2);
            } catch (Throwable th) {
                throw new ComputerDriverException("Failed to upload algorithm Jar", th);
            }
        } catch (Throwable th2) {
            FileUtils.deleteQuietly(file);
            throw th2;
        }
    }

    @Override // org.apache.hugegraph.computer.driver.ComputerDriver
    public String submitJob(String str, Map<String, String> map) {
        HugeGraphComputerJob hugeGraphComputerJob = new HugeGraphComputerJob();
        String genJobId = KubeUtil.genJobId(str);
        hugeGraphComputerJob.setMetadata(new ObjectMetaBuilder().withNamespace(this.namespace).withName(KubeUtil.crName(genJobId)).build());
        ComputerJobSpec computerJobSpec = computerJobSpec(this.defaultSpec, map);
        Map<String, String> computerConf = computerConf(this.defaultConf, map);
        checkComputerConf(computerConf, computerJobSpec);
        computerJobSpec.withAlgorithmName(str).withJobId(genJobId).withComputerConf(computerConf);
        if (this.enableInternalAlgorithm.booleanValue() && this.internalAlgorithms.contains(str)) {
            computerJobSpec.withImage(this.internalAlgorithmImageUrl);
        } else if (StringUtils.isNotBlank(computerJobSpec.getRemoteJarUri())) {
            computerJobSpec.withImage(this.frameworkImageUrl);
        } else {
            computerJobSpec.withImage(buildImageUrl(str)).withJarFile(buildJarFile((String) this.conf.get(KubeDriverOptions.JAR_FILE_DIR), str));
        }
        hugeGraphComputerJob.setSpec(computerJobSpec);
        this.operation.createOrReplace(hugeGraphComputerJob);
        return genJobId;
    }

    private void checkComputerConf(Map<String, String> map, ComputerJobSpec computerJobSpec) {
        Collection removeAll = CollectionUtils.removeAll(ComputerOptions.COMPUTER_REQUIRED_USER_OPTIONS, map.keySet());
        E.checkArgument(removeAll.isEmpty(), "The %s options can't be null", removeAll);
        int parseInt = Integer.parseInt(map.getOrDefault(ComputerOptions.JOB_PARTITIONS_COUNT.name(), "1"));
        int intValue = computerJobSpec.getWorkerInstances().intValue();
        E.checkArgument(parseInt >= intValue, "The partitions count must be >= workers instances, but got %s < %s", Integer.valueOf(parseInt), Integer.valueOf(intValue));
    }

    @Override // org.apache.hugegraph.computer.driver.ComputerDriver
    public boolean cancelJob(String str, Map<String, String> map) {
        return ((Resource) this.operation.withName(KubeUtil.crName(str))).delete().booleanValue();
    }

    @Override // org.apache.hugegraph.computer.driver.ComputerDriver
    public CompletableFuture<Void> waitJobAsync(String str, Map<String, String> map, JobObserver jobObserver) {
        JobState jobState = jobState(str, map);
        if (jobState == null) {
            LOG.warn("Unable to fetch state of job '{}', it may have been deleted", str);
            return null;
        }
        jobObserver.onJobStateChanged(jobState);
        CompletableFuture<Void> completableFuture = null;
        synchronized (this.watchActive) {
            if (this.watchActive.getValue2().booleanValue()) {
                completableFuture = new CompletableFuture<>();
                this.waits.put(str, Pair.of(completableFuture, jobObserver));
            } else {
                this.watch = initWatch();
                this.watchActive.setTrue();
            }
        }
        return completableFuture;
    }

    private Watch initWatch() {
        return this.operation.watch(new Watcher<HugeGraphComputerJob>() { // from class: org.apache.hugegraph.computer.k8s.driver.KubernetesDriver.1
            @Override // io.fabric8.kubernetes.client.Watcher
            public void eventReceived(Watcher.Action action, HugeGraphComputerJob hugeGraphComputerJob) {
                Pair<CompletableFuture<Void>, JobObserver> pair;
                if (hugeGraphComputerJob == null || action == Watcher.Action.ERROR) {
                    return;
                }
                String jobId = hugeGraphComputerJob.getSpec().getJobId();
                if (StringUtils.isBlank(jobId) || (pair = KubernetesDriver.this.waits.get(jobId)) == null) {
                    return;
                }
                CompletableFuture<Void> left = pair.getLeft();
                JobObserver right = pair.getRight();
                KubernetesDriver kubernetesDriver = KubernetesDriver.this;
                JobState buildJobState = kubernetesDriver.buildJobState(hugeGraphComputerJob);
                right.onJobStateChanged(buildJobState);
                if (JobStatus.finished(buildJobState.jobStatus())) {
                    left.complete(null);
                    kubernetesDriver.cancelWait(jobId);
                }
            }

            @Override // io.fabric8.kubernetes.client.Watcher
            public void onClose(WatcherException watcherException) {
                for (Pair<CompletableFuture<Void>, JobObserver> pair : KubernetesDriver.this.waits.values()) {
                    if (pair != null) {
                        pair.getLeft().completeExceptionally(watcherException);
                    }
                }
                synchronized (KubernetesDriver.this.watchActive) {
                    KubernetesDriver.this.waits.clear();
                    Watch watch = KubernetesDriver.this.watch;
                    if (watch != null) {
                        watch.close();
                    }
                    KubernetesDriver.this.watchActive.setFalse();
                }
            }
        });
    }

    private void cancelWait(String str) {
        Pair<CompletableFuture<Void>, JobObserver> remove = this.waits.remove(str);
        if (remove != null) {
            remove.getLeft().cancel(true);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.hugegraph.computer.driver.ComputerDriver
    public JobState jobState(String str, Map<String, String> map) {
        HugeGraphComputerJob hugeGraphComputerJob = (HugeGraphComputerJob) ((Resource) this.operation.withName(KubeUtil.crName(str))).get();
        if (hugeGraphComputerJob == null) {
            return null;
        }
        return buildJobState(hugeGraphComputerJob);
    }

    @Override // org.apache.hugegraph.computer.driver.ComputerDriver
    public List<SuperstepStat> superstepStats(String str, Map<String, String> map) {
        return null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.hugegraph.computer.driver.ComputerDriver
    public String diagnostics(String str, Map<String, String> map) {
        Event event = (Event) ((Resource) this.kubeClient.v1().events().withName(KubeUtil.failedEventName(KubeUtil.crName(str)))).get();
        if (event == null) {
            return null;
        }
        return event.getMessage();
    }

    @Override // org.apache.hugegraph.computer.driver.ComputerDriver
    public String log(String str, int i, long j, long j2, Map<String, String> map) {
        return null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Iterator<Pair<CompletableFuture<Void>, JobObserver>> it = this.waits.values().iterator();
        while (it.hasNext()) {
            it.next().getLeft().cancel(true);
            it.remove();
        }
        if (this.watch != null) {
            this.watch.close();
            this.watchActive.setFalse();
        }
        if (this.kubeClient != null) {
            this.kubeClient.close();
        }
    }

    private JobState buildJobState(HugeGraphComputerJob hugeGraphComputerJob) {
        E.checkNotNull(hugeGraphComputerJob, "computerJob");
        ComputerJobStatus status = hugeGraphComputerJob.getStatus();
        if (status == null || status.getJobStatus() == null) {
            return new DefaultJobState().jobStatus(JobStatus.INITIALIZING);
        }
        return new DefaultJobState().jobStatus(JobStatus.valueOf(status.getJobStatus()));
    }

    private String buildImageUrl(String str) {
        return KubeUtil.imageUrl((String) this.conf.get(KubeDriverOptions.IMAGE_REPOSITORY_URL), str, null);
    }

    private String buildJarFile(String str, String str2) {
        return URLUtils.join(str, str2 + ".jar");
    }

    private Map<String, String> computerConf(Map<String, String> map, Map<String, String> map2) {
        HashMap hashMap = new HashMap(map);
        Map<String, TypedOption<?, ?>> options = ComputerOptions.instance().options();
        map2.forEach((str, str2) -> {
            if (!StringUtils.isNotBlank(str) || !StringUtils.isNotBlank(str2) || str.startsWith(Constants.K8S_SPEC_PREFIX) || ComputerOptions.COMPUTER_PROHIBIT_USER_OPTIONS.contains(str)) {
                return;
            }
            ConfigOption configOption = (ConfigOption) options.get(str);
            if (configOption != null) {
                configOption.parseConvert(str2);
            }
            hashMap.put(str, str2);
        });
        return hashMap;
    }

    private Map<String, String> defaultComputerConf() {
        HashMap hashMap = new HashMap();
        for (TypedOption<?, ?> typedOption : ComputerOptions.instance().options().values()) {
            Object obj = this.conf.get(typedOption);
            String name = typedOption.name();
            if (obj != null) {
                hashMap.put(name, String.valueOf(obj));
            } else {
                E.checkArgument(!ComputerOptions.REQUIRED_OPTIONS.contains(name), "The %s option can't be null", name);
            }
        }
        return Collections.unmodifiableMap(hashMap);
    }

    private ComputerJobSpec computerJobSpec(Map<String, Object> map, Map<String, String> map2) {
        HashMap hashMap = new HashMap(map);
        KubeSpecOptions.ALLOW_USER_SETTINGS.forEach((str, configOption) -> {
            String str = (String) map2.get(str);
            if (StringUtils.isNotBlank(str)) {
                String covertSpecKey = KubeUtil.covertSpecKey(str);
                if (!KubeSpecOptions.MAP_TYPE_CONFIGS.contains(configOption)) {
                    hashMap.put(covertSpecKey, configOption.parseConvert(str));
                    return;
                }
                if (StringUtils.isNotBlank(str)) {
                    HashMap hashMap2 = new HashMap();
                    Iterator it = ((List) configOption.parseConvert(str)).iterator();
                    while (it.hasNext()) {
                        String[] split = ((String) it.next()).split(ParameterizedMessage.ERROR_MSG_SEPARATOR, 2);
                        if (!$assertionsDisabled && split.length != 2) {
                            throw new AssertionError();
                        }
                        hashMap2.put(split[0], split[1]);
                    }
                    hashMap.put(covertSpecKey, hashMap2);
                }
            }
        });
        return HugeGraphComputerJob.mapToSpec(hashMap);
    }

    private Map<String, Object> defaultSpec() {
        HashMap hashMap = new HashMap();
        for (TypedOption<?, ?> typedOption : KubeSpecOptions.instance().options().values()) {
            Object obj = this.conf.get(typedOption);
            if (obj != null) {
                String covertSpecKey = KubeUtil.covertSpecKey(typedOption.name());
                if (!KubeSpecOptions.MAP_TYPE_CONFIGS.contains(typedOption)) {
                    hashMap.put(covertSpecKey, obj);
                } else if (!Objects.equals(String.valueOf(obj), "[]")) {
                    hashMap.put(covertSpecKey, this.conf.getMap((ConfigListOption) typedOption));
                }
            }
        }
        ComputerJobSpec mapToSpec = HugeGraphComputerJob.mapToSpec(hashMap);
        List<String> list = (List) this.conf.get(KubeDriverOptions.PULL_SECRET_NAMES);
        if (CollectionUtils.isNotEmpty(list)) {
            ArrayList arrayList = new ArrayList();
            for (String str : list) {
                if (!StringUtils.isBlank(str)) {
                    arrayList.add(new LocalObjectReference(str));
                }
            }
            if (CollectionUtils.isNotEmpty(arrayList)) {
                mapToSpec.withPullSecrets(arrayList);
            }
        }
        String str2 = (String) this.conf.get(KubeDriverOptions.LOG4J_XML_PATH);
        if (StringUtils.isNotBlank(str2)) {
            try {
                mapToSpec.withLog4jXml(FileUtils.readFileToString(new File(str2)));
            } catch (IOException e) {
                throw new ComputerDriverException("Failed to read log4j file for computer job", e);
            }
        }
        return Collections.unmodifiableMap(HugeGraphComputerJob.specToMap(mapToSpec));
    }

    static {
        $assertionsDisabled = !KubernetesDriver.class.desiredAssertionStatus();
        LOG = Log.logger((Class<?>) KubernetesDriver.class);
        TMP_DIR = System.getProperty("java.io.tmpdir");
    }
}
