package org.apache.pulsar.functions.runtime;

import io.kubernetes.client.ApiClient;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.util.Config;
import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.class */
public class KubernetesRuntimeFactory implements RuntimeFactory {
    private static final Logger log = LoggerFactory.getLogger(KubernetesRuntimeFactory.class);
    private final Boolean submittingInsidePod;
    private final Boolean installUserCodeDependencies;
    private final Map<String, String> customLabels;
    private final Integer expectedMetricsCollectionInterval;
    private final String stateStorageServiceUri;
    private final AuthenticationConfig authConfig;
    private final String javaInstanceJarFile;
    private final String pythonInstanceFile;
    private final String extraDependenciesDir;
    private final SecretsProviderConfigurator secretsProviderConfigurator;
    private Timer changeConfigMapTimer;
    private AppsV1Api appsClient;
    private CoreV1Api coreClient;
    private Resources functionInstanceMinResources;
    private final String logDirectory = "logs/functions";
    private final KubernetesInfo kubernetesInfo = new KubernetesInfo();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory$KubernetesInfo.class */
    public class KubernetesInfo {
        private String k8Uri;
        private String jobNamespace;
        private String pulsarDockerImageName;
        private String imagePullPolicy;
        private String pulsarRootDir;
        private String pulsarAdminUrl;
        private String pulsarServiceUrl;
        private String pythonDependencyRepository;
        private String pythonExtraDependencyRepository;
        private String extraDependenciesDir;
        private String changeConfigMap;
        private String changeConfigMapNamespace;

        public String getK8Uri() {
            return this.k8Uri;
        }

        public String getJobNamespace() {
            return this.jobNamespace;
        }

        public String getPulsarDockerImageName() {
            return this.pulsarDockerImageName;
        }

        public String getImagePullPolicy() {
            return this.imagePullPolicy;
        }

        public String getPulsarRootDir() {
            return this.pulsarRootDir;
        }

        public String getPulsarAdminUrl() {
            return this.pulsarAdminUrl;
        }

        public String getPulsarServiceUrl() {
            return this.pulsarServiceUrl;
        }

        public String getPythonDependencyRepository() {
            return this.pythonDependencyRepository;
        }

        public String getPythonExtraDependencyRepository() {
            return this.pythonExtraDependencyRepository;
        }

        public String getExtraDependenciesDir() {
            return this.extraDependenciesDir;
        }

        public String getChangeConfigMap() {
            return this.changeConfigMap;
        }

        public String getChangeConfigMapNamespace() {
            return this.changeConfigMapNamespace;
        }

        public void setK8Uri(String str) {
            this.k8Uri = str;
        }

        public void setJobNamespace(String str) {
            this.jobNamespace = str;
        }

        public void setPulsarDockerImageName(String str) {
            this.pulsarDockerImageName = str;
        }

        public void setImagePullPolicy(String str) {
            this.imagePullPolicy = str;
        }

        public void setPulsarRootDir(String str) {
            this.pulsarRootDir = str;
        }

        public void setPulsarAdminUrl(String str) {
            this.pulsarAdminUrl = str;
        }

        public void setPulsarServiceUrl(String str) {
            this.pulsarServiceUrl = str;
        }

        public void setPythonDependencyRepository(String str) {
            this.pythonDependencyRepository = str;
        }

        public void setPythonExtraDependencyRepository(String str) {
            this.pythonExtraDependencyRepository = str;
        }

        public void setExtraDependenciesDir(String str) {
            this.extraDependenciesDir = str;
        }

        public void setChangeConfigMap(String str) {
            this.changeConfigMap = str;
        }

        public void setChangeConfigMapNamespace(String str) {
            this.changeConfigMapNamespace = str;
        }

        public KubernetesInfo() {
        }
    }

    @VisibleForTesting
    public KubernetesRuntimeFactory(String str, String str2, String str3, String str4, String str5, Boolean bool, Boolean bool2, String str6, String str7, String str8, Map<String, String> map, String str9, String str10, String str11, AuthenticationConfig authenticationConfig, Integer num, String str12, String str13, Resources resources, SecretsProviderConfigurator secretsProviderConfigurator) {
        this.kubernetesInfo.setK8Uri(str);
        if (StringUtils.isEmpty(str2)) {
            this.kubernetesInfo.setJobNamespace("default");
        } else {
            this.kubernetesInfo.setJobNamespace(str2);
        }
        if (StringUtils.isEmpty(str3)) {
            this.kubernetesInfo.setPulsarDockerImageName("apachepulsar/pulsar");
        } else {
            this.kubernetesInfo.setPulsarDockerImageName(str3);
        }
        if (StringUtils.isEmpty(str4)) {
            this.kubernetesInfo.setImagePullPolicy("IfNotPresent");
        } else {
            this.kubernetesInfo.setImagePullPolicy(str4);
        }
        if (StringUtils.isEmpty(str5)) {
            this.kubernetesInfo.setPulsarRootDir("/pulsar");
        } else {
            this.kubernetesInfo.setPulsarRootDir(str5);
        }
        if (!StringUtils.isNotEmpty(str8)) {
            this.extraDependenciesDir = this.kubernetesInfo.getPulsarRootDir() + "/instances/deps";
        } else if (Paths.get(str8, new String[0]).isAbsolute()) {
            this.extraDependenciesDir = str8;
        } else {
            this.extraDependenciesDir = this.kubernetesInfo.getPulsarRootDir() + "/" + str8;
        }
        this.kubernetesInfo.setExtraDependenciesDir(str8);
        this.kubernetesInfo.setPythonDependencyRepository(str6);
        this.kubernetesInfo.setPythonExtraDependencyRepository(str7);
        this.kubernetesInfo.setPulsarServiceUrl(str9);
        this.kubernetesInfo.setPulsarAdminUrl(str10);
        this.kubernetesInfo.setChangeConfigMap(str12);
        this.kubernetesInfo.setChangeConfigMapNamespace(str13);
        this.submittingInsidePod = bool;
        this.installUserCodeDependencies = bool2;
        this.customLabels = map;
        this.stateStorageServiceUri = str11;
        this.authConfig = authenticationConfig;
        this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar";
        this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
        this.expectedMetricsCollectionInterval = Integer.valueOf(num == null ? -1 : num.intValue());
        this.secretsProviderConfigurator = secretsProviderConfigurator;
        this.functionInstanceMinResources = resources;
    }

    @Override // org.apache.pulsar.functions.runtime.RuntimeFactory
    public boolean externallyManaged() {
        return true;
    }

    @Override // org.apache.pulsar.functions.runtime.RuntimeFactory
    public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String str, String str2, Long l) throws Exception {
        String str3;
        setupClient();
        switch (instanceConfig.getFunctionDetails().getRuntime()) {
            case JAVA:
                str3 = this.javaInstanceJarFile;
                break;
            case PYTHON:
                str3 = this.pythonInstanceFile;
                break;
            default:
                throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
        }
        return new KubernetesRuntime(this.appsClient, this.coreClient, this.kubernetesInfo.getJobNamespace(), this.customLabels, this.installUserCodeDependencies, this.kubernetesInfo.getPythonDependencyRepository(), this.kubernetesInfo.getPythonExtraDependencyRepository(), this.kubernetesInfo.getPulsarDockerImageName(), this.kubernetesInfo.imagePullPolicy, this.kubernetesInfo.getPulsarRootDir(), instanceConfig, str3, this.extraDependenciesDir, "logs/functions", str, str2, this.kubernetesInfo.getPulsarServiceUrl(), this.kubernetesInfo.getPulsarAdminUrl(), this.stateStorageServiceUri, this.authConfig, this.secretsProviderConfigurator, this.expectedMetricsCollectionInterval);
    }

    @Override // org.apache.pulsar.functions.runtime.RuntimeFactory, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.pulsar.functions.runtime.RuntimeFactory
    public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
        KubernetesRuntime.doChecks(functionDetails);
        validateMinResourcesRequired(functionDetails);
        try {
            setupClient();
            this.secretsProviderConfigurator.doAdmissionChecks(this.appsClient, this.coreClient, this.kubernetesInfo.getJobNamespace(), functionDetails);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    void setupClient() throws Exception {
        ApiClient defaultClient;
        if (this.appsClient == null) {
            if (this.kubernetesInfo.getK8Uri() == null) {
                log.info("k8Uri is null thus going by defaults");
                if (this.submittingInsidePod.booleanValue()) {
                    log.info("Looks like we are inside a k8 pod ourselves. Initializing as cluster");
                    defaultClient = Config.fromCluster();
                } else {
                    log.info("Using default cluster since we are not running inside k8");
                    defaultClient = Config.defaultClient();
                }
                Configuration.setDefaultApiClient(defaultClient);
                this.appsClient = new AppsV1Api();
                this.coreClient = new CoreV1Api();
            } else {
                log.info("Setting up k8Client using uri " + this.kubernetesInfo.getK8Uri());
                ApiClient basePath = new ApiClient().setBasePath(this.kubernetesInfo.getK8Uri());
                this.appsClient = new AppsV1Api(basePath);
                this.coreClient = new CoreV1Api(basePath);
            }
            if (StringUtils.isEmpty(this.kubernetesInfo.getChangeConfigMap())) {
                return;
            }
            this.changeConfigMapTimer = new Timer();
            this.changeConfigMapTimer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    KubernetesRuntimeFactory.this.fetchConfigMap();
                }
            }, 300000L, 300000L);
        }
    }

    void fetchConfigMap() {
        try {
            Map<String, String> data = this.coreClient.readNamespacedConfigMap(this.kubernetesInfo.getChangeConfigMap(), this.kubernetesInfo.getChangeConfigMapNamespace(), (String) null, true, false).getData();
            if (data != null) {
                overRideKubernetesConfig(data);
            }
        } catch (Exception e) {
            log.error("Error while trying to fetch configmap {} at namespace {}", new Object[]{this.kubernetesInfo.getChangeConfigMap(), this.kubernetesInfo.getChangeConfigMapNamespace(), e});
        }
    }

    void overRideKubernetesConfig(Map<String, String> map) throws Exception {
        for (Field field : KubernetesInfo.class.getDeclaredFields()) {
            field.setAccessible(true);
            if (map.containsKey(field.getName()) && !map.get(field.getName()).equals(field.get(this.kubernetesInfo))) {
                log.info("Kubernetes Config {} changed from {} to {}", new Object[]{field.getName(), field.get(this.kubernetesInfo), map.get(field.getName())});
                field.set(this.kubernetesInfo, map.get(field.getName()));
            }
        }
    }

    void validateMinResourcesRequired(Function.FunctionDetails functionDetails) {
        if (this.functionInstanceMinResources != null) {
            Double cpu = this.functionInstanceMinResources.getCpu();
            Long ram = this.functionInstanceMinResources.getRam();
            if (cpu != null) {
                if (functionDetails.getResources() == null) {
                    throw new IllegalArgumentException(String.format("Per instance CPU requested is not specified. Must specify CPU requested for function to be at least %s", cpu));
                }
                if (functionDetails.getResources().getCpu() < cpu.doubleValue()) {
                    throw new IllegalArgumentException(String.format("Per instance CPU requested, %s, for function is less than the minimum required, %s", Double.valueOf(functionDetails.getResources().getCpu()), cpu));
                }
            }
            if (ram != null) {
                if (functionDetails.getResources() == null) {
                    throw new IllegalArgumentException(String.format("Per instance RAM requested is not specified. Must specify RAM requested for function to be at least %s", ram));
                }
                if (functionDetails.getResources().getRam() < ram.longValue()) {
                    throw new IllegalArgumentException(String.format("Per instance RAM requested, %s, for function is less than the minimum required, %s", Long.valueOf(functionDetails.getResources().getRam()), ram));
                }
            }
        }
    }
}
