/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime;

import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.util.Config;
import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.functions.auth.FunctionAuthUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.KubernetesRuntime;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesRuntimeFactory
implements RuntimeFactory {
    private static final Logger log = LoggerFactory.getLogger(KubernetesRuntimeFactory.class);
    static int NUM_RETRIES = 5;
    static long SLEEP_BETWEEN_RETRIES_MS = 500L;
    private final KubernetesInfo kubernetesInfo = new KubernetesInfo();
    private final Boolean submittingInsidePod;
    private final Boolean installUserCodeDependencies;
    private final Map<String, String> customLabels;
    private final Integer expectedMetricsCollectionInterval;
    private final String stateStorageServiceUri;
    private final AuthenticationConfig authConfig;
    private final String javaInstanceJarFile;
    private final String pythonInstanceFile;
    private final String extraDependenciesDir;
    private final SecretsProviderConfigurator secretsProviderConfigurator;
    private final String logDirectory = "logs/functions";
    private Timer changeConfigMapTimer;
    private AppsV1Api appsClient;
    private CoreV1Api coreClient;
    private Resources functionInstanceMinResources;
    private final boolean authenticationEnabled;

    @VisibleForTesting
    public KubernetesRuntimeFactory(String k8Uri, String jobNamespace, String pulsarDockerImageName, String imagePullPolicy, String pulsarRootDir, Boolean submittingInsidePod, Boolean installUserCodeDependencies, String pythonDependencyRepository, String pythonExtraDependencyRepository, String extraDependenciesDir, Map<String, String> customLabels, int percentMemoryPadding, String pulsarServiceUri, String pulsarAdminUri, String stateStorageServiceUri, AuthenticationConfig authConfig, Integer expectedMetricsCollectionInterval, String changeConfigMap, String changeConfigMapNamespace, Resources functionInstanceMinResources, SecretsProviderConfigurator secretsProviderConfigurator, boolean authenticationEnabled) {
        this.kubernetesInfo.setK8Uri(k8Uri);
        if (!StringUtils.isEmpty((CharSequence)jobNamespace)) {
            this.kubernetesInfo.setJobNamespace(jobNamespace);
        } else {
            this.kubernetesInfo.setJobNamespace("default");
        }
        if (!StringUtils.isEmpty((CharSequence)pulsarDockerImageName)) {
            this.kubernetesInfo.setPulsarDockerImageName(pulsarDockerImageName);
        } else {
            this.kubernetesInfo.setPulsarDockerImageName("apachepulsar/pulsar");
        }
        if (!StringUtils.isEmpty((CharSequence)imagePullPolicy)) {
            this.kubernetesInfo.setImagePullPolicy(imagePullPolicy);
        } else {
            this.kubernetesInfo.setImagePullPolicy("IfNotPresent");
        }
        if (!StringUtils.isEmpty((CharSequence)pulsarRootDir)) {
            this.kubernetesInfo.setPulsarRootDir(pulsarRootDir);
        } else {
            this.kubernetesInfo.setPulsarRootDir("/pulsar");
        }
        this.extraDependenciesDir = StringUtils.isNotEmpty((CharSequence)extraDependenciesDir) ? (Paths.get(extraDependenciesDir, new String[0]).isAbsolute() ? extraDependenciesDir : this.kubernetesInfo.getPulsarRootDir() + "/" + extraDependenciesDir) : this.kubernetesInfo.getPulsarRootDir() + "/instances/deps";
        this.kubernetesInfo.setExtraDependenciesDir(extraDependenciesDir);
        this.kubernetesInfo.setPythonDependencyRepository(pythonDependencyRepository);
        this.kubernetesInfo.setPythonExtraDependencyRepository(pythonExtraDependencyRepository);
        this.kubernetesInfo.setPulsarServiceUrl(pulsarServiceUri);
        this.kubernetesInfo.setPulsarAdminUrl(pulsarAdminUri);
        this.kubernetesInfo.setChangeConfigMap(changeConfigMap);
        this.kubernetesInfo.setChangeConfigMapNamespace(changeConfigMapNamespace);
        this.kubernetesInfo.setPercentMemoryPadding(percentMemoryPadding);
        this.submittingInsidePod = submittingInsidePod;
        this.installUserCodeDependencies = installUserCodeDependencies;
        this.customLabels = customLabels;
        this.stateStorageServiceUri = stateStorageServiceUri;
        this.authConfig = authConfig;
        this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar";
        this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
        this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
        this.secretsProviderConfigurator = secretsProviderConfigurator;
        this.functionInstanceMinResources = functionInstanceMinResources;
        this.authenticationEnabled = authenticationEnabled;
        try {
            this.setupClient();
        }
        catch (Exception e) {
            log.error("Failed to setup client", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean externallyManaged() {
        return true;
    }

    @Override
    public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl, String originalCodeFileName, Long expectedHealthCheckInterval) throws Exception {
        String instanceFile = null;
        switch (instanceConfig.getFunctionDetails().getRuntime()) {
            case JAVA: {
                instanceFile = this.javaInstanceJarFile;
                break;
            }
            case PYTHON: {
                instanceFile = this.pythonInstanceFile;
                break;
            }
            case GO: {
                throw new UnsupportedOperationException();
            }
            default: {
                throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
            }
        }
        if (this.authenticationEnabled) {
            this.getAuthProvider().configureAuthenticationConfig(this.authConfig, Optional.ofNullable(FunctionAuthUtils.getFunctionAuthData(Optional.ofNullable(instanceConfig.getFunctionAuthenticationSpec()))));
        }
        return new KubernetesRuntime(this.appsClient, this.coreClient, this.kubernetesInfo.getJobNamespace(), this.customLabels, this.installUserCodeDependencies, this.kubernetesInfo.getPythonDependencyRepository(), this.kubernetesInfo.getPythonExtraDependencyRepository(), this.kubernetesInfo.getPulsarDockerImageName(), this.kubernetesInfo.imagePullPolicy, this.kubernetesInfo.getPulsarRootDir(), instanceConfig, instanceFile, this.extraDependenciesDir, "logs/functions", codePkgUrl, originalCodeFileName, this.kubernetesInfo.getPulsarServiceUrl(), this.kubernetesInfo.getPulsarAdminUrl(), this.stateStorageServiceUri, this.authConfig, this.secretsProviderConfigurator, this.expectedMetricsCollectionInterval, this.kubernetesInfo.getPercentMemoryPadding(), this.getAuthProvider(), this.authenticationEnabled);
    }

    @Override
    public void close() {
    }

    @Override
    public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
        KubernetesRuntime.doChecks(functionDetails);
        this.validateMinResourcesRequired(functionDetails);
        this.secretsProviderConfigurator.doAdmissionChecks(this.appsClient, this.coreClient, this.kubernetesInfo.getJobNamespace(), functionDetails);
    }

    @VisibleForTesting
    public void setupClient() throws Exception {
        if (this.appsClient == null) {
            if (this.kubernetesInfo.getK8Uri() == null) {
                ApiClient cli;
                log.info("k8Uri is null thus going by defaults");
                if (this.submittingInsidePod.booleanValue()) {
                    log.info("Looks like we are inside a k8 pod ourselves. Initializing as cluster");
                    cli = Config.fromCluster();
                } else {
                    log.info("Using default cluster since we are not running inside k8");
                    cli = Config.defaultClient();
                }
                Configuration.setDefaultApiClient((ApiClient)cli);
                this.appsClient = new AppsV1Api();
                this.coreClient = new CoreV1Api();
            } else {
                log.info("Setting up k8Client using uri " + this.kubernetesInfo.getK8Uri());
                ApiClient apiClient = new ApiClient().setBasePath(this.kubernetesInfo.getK8Uri());
                this.appsClient = new AppsV1Api(apiClient);
                this.coreClient = new CoreV1Api(apiClient);
            }
            if (!StringUtils.isEmpty((CharSequence)this.kubernetesInfo.getChangeConfigMap())) {
                this.changeConfigMapTimer = new Timer();
                this.changeConfigMapTimer.scheduleAtFixedRate(new TimerTask(){

                    @Override
                    public void run() {
                        KubernetesRuntimeFactory.this.fetchConfigMap();
                    }
                }, 300000L, 300000L);
            }
        }
    }

    void fetchConfigMap() {
        try {
            V1ConfigMap v1ConfigMap = this.coreClient.readNamespacedConfigMap(this.kubernetesInfo.getChangeConfigMap(), this.kubernetesInfo.getChangeConfigMapNamespace(), null, Boolean.valueOf(true), Boolean.valueOf(false));
            Map data = v1ConfigMap.getData();
            if (data != null) {
                this.overRideKubernetesConfig(data);
            }
        }
        catch (Exception e) {
            log.error("Error while trying to fetch configmap {} at namespace {}", new Object[]{this.kubernetesInfo.getChangeConfigMap(), this.kubernetesInfo.getChangeConfigMapNamespace(), e});
        }
    }

    void overRideKubernetesConfig(Map<String, String> data) throws Exception {
        for (Field field : KubernetesInfo.class.getDeclaredFields()) {
            field.setAccessible(true);
            if (!data.containsKey(field.getName()) || data.get(field.getName()).equals(field.get(this.kubernetesInfo))) continue;
            log.info("Kubernetes Config {} changed from {} to {}", new Object[]{field.getName(), field.get(this.kubernetesInfo), data.get(field.getName())});
            field.set(this.kubernetesInfo, data.get(field.getName()));
        }
    }

    void validateMinResourcesRequired(Function.FunctionDetails functionDetails) {
        if (this.functionInstanceMinResources != null) {
            Double minCpu = this.functionInstanceMinResources.getCpu();
            Long minRam = this.functionInstanceMinResources.getRam();
            if (minCpu != null) {
                if (functionDetails.getResources() == null) {
                    throw new IllegalArgumentException(String.format("Per instance CPU requested is not specified. Must specify CPU requested for function to be at least %s", minCpu));
                }
                if (functionDetails.getResources().getCpu() < minCpu) {
                    throw new IllegalArgumentException(String.format("Per instance CPU requested, %s, for function is less than the minimum required, %s", functionDetails.getResources().getCpu(), minCpu));
                }
            }
            if (minRam != null) {
                if (functionDetails.getResources() == null) {
                    throw new IllegalArgumentException(String.format("Per instance RAM requested is not specified. Must specify RAM requested for function to be at least %s", minRam));
                }
                if (functionDetails.getResources().getRam() < minRam) {
                    throw new IllegalArgumentException(String.format("Per instance RAM requested, %s, for function is less than the minimum required, %s", functionDetails.getResources().getRam(), minRam));
                }
            }
        }
    }

    @Override
    public KubernetesFunctionAuthProvider getAuthProvider() {
        return new KubernetesSecretsTokenAuthProvider(this.coreClient, this.kubernetesInfo.jobNamespace);
    }

    class KubernetesInfo {
        private String k8Uri;
        private String jobNamespace;
        private String pulsarDockerImageName;
        private String imagePullPolicy;
        private String pulsarRootDir;
        private String pulsarAdminUrl;
        private String pulsarServiceUrl;
        private String pythonDependencyRepository;
        private String pythonExtraDependencyRepository;
        private String extraDependenciesDir;
        private String changeConfigMap;
        private String changeConfigMapNamespace;
        private int percentMemoryPadding;

        public String getK8Uri() {
            return this.k8Uri;
        }

        public String getJobNamespace() {
            return this.jobNamespace;
        }

        public String getPulsarDockerImageName() {
            return this.pulsarDockerImageName;
        }

        public String getImagePullPolicy() {
            return this.imagePullPolicy;
        }

        public String getPulsarRootDir() {
            return this.pulsarRootDir;
        }

        public String getPulsarAdminUrl() {
            return this.pulsarAdminUrl;
        }

        public String getPulsarServiceUrl() {
            return this.pulsarServiceUrl;
        }

        public String getPythonDependencyRepository() {
            return this.pythonDependencyRepository;
        }

        public String getPythonExtraDependencyRepository() {
            return this.pythonExtraDependencyRepository;
        }

        public String getExtraDependenciesDir() {
            return this.extraDependenciesDir;
        }

        public String getChangeConfigMap() {
            return this.changeConfigMap;
        }

        public String getChangeConfigMapNamespace() {
            return this.changeConfigMapNamespace;
        }

        public int getPercentMemoryPadding() {
            return this.percentMemoryPadding;
        }

        public void setK8Uri(String k8Uri) {
            this.k8Uri = k8Uri;
        }

        public void setJobNamespace(String jobNamespace) {
            this.jobNamespace = jobNamespace;
        }

        public void setPulsarDockerImageName(String pulsarDockerImageName) {
            this.pulsarDockerImageName = pulsarDockerImageName;
        }

        public void setImagePullPolicy(String imagePullPolicy) {
            this.imagePullPolicy = imagePullPolicy;
        }

        public void setPulsarRootDir(String pulsarRootDir) {
            this.pulsarRootDir = pulsarRootDir;
        }

        public void setPulsarAdminUrl(String pulsarAdminUrl) {
            this.pulsarAdminUrl = pulsarAdminUrl;
        }

        public void setPulsarServiceUrl(String pulsarServiceUrl) {
            this.pulsarServiceUrl = pulsarServiceUrl;
        }

        public void setPythonDependencyRepository(String pythonDependencyRepository) {
            this.pythonDependencyRepository = pythonDependencyRepository;
        }

        public void setPythonExtraDependencyRepository(String pythonExtraDependencyRepository) {
            this.pythonExtraDependencyRepository = pythonExtraDependencyRepository;
        }

        public void setExtraDependenciesDir(String extraDependenciesDir) {
            this.extraDependenciesDir = extraDependenciesDir;
        }

        public void setChangeConfigMap(String changeConfigMap) {
            this.changeConfigMap = changeConfigMap;
        }

        public void setChangeConfigMapNamespace(String changeConfigMapNamespace) {
            this.changeConfigMapNamespace = changeConfigMapNamespace;
        }

        public void setPercentMemoryPadding(int percentMemoryPadding) {
            this.percentMemoryPadding = percentMemoryPadding;
        }
    }
}

