/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RuntimeUtils {
    private static final Logger log = LoggerFactory.getLogger(RuntimeUtils.class);
    private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";
    static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";

    public static List<String> composeCmd(InstanceConfig instanceConfig, String instanceFile, String extraDependenciesDir, String logDirectory, String originalCodeFileName, String pulsarServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, String shardId, Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, String secretsProviderClassName, String secretsProviderConfig, Boolean installUserCodeDependencies, String pythonDependencyRepository, String pythonExtraDependencyRepository, int metricsPort) throws Exception {
        List<String> cmd = RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir);
        cmd.addAll(RuntimeUtils.getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory, originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl, authConfig, shardId, grpcPort, expectedHealthCheckInterval, logConfigFile, secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository, metricsPort));
        return cmd;
    }

    public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, String extraDependenciesDir) {
        LinkedList<String> args = new LinkedList<String>();
        if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA && instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON && StringUtils.isNotEmpty((CharSequence)extraDependenciesDir)) {
            args.add("PYTHONPATH=${PYTHONPATH}:" + extraDependenciesDir);
        }
        return args;
    }

    public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig, String originalCodeFileName, String pulsarServiceUrl) throws IOException {
        LinkedList<String> args = new LinkedList<String>();
        GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
        if (instanceConfig.getClusterName() != null) {
            goInstanceConfig.setClusterName(instanceConfig.getClusterName());
        }
        if (instanceConfig.getInstanceId() != 0) {
            goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
        }
        if (instanceConfig.getFunctionId() != null) {
            goInstanceConfig.setFuncID(instanceConfig.getFunctionId());
        }
        if (instanceConfig.getFunctionVersion() != null) {
            goInstanceConfig.setFuncVersion(instanceConfig.getFunctionVersion());
        }
        if (instanceConfig.getFunctionDetails().getAutoAck()) {
            goInstanceConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
        }
        if (instanceConfig.getFunctionDetails().getTenant() != null) {
            goInstanceConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
        }
        if (instanceConfig.getFunctionDetails().getNamespace() != null) {
            goInstanceConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
        }
        if (instanceConfig.getFunctionDetails().getName() != null) {
            goInstanceConfig.setName(instanceConfig.getFunctionDetails().getName());
        }
        if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
            goInstanceConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
        }
        if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != null) {
            goInstanceConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
        }
        if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
            goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
        }
        if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
            goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
        }
        if (instanceConfig.getMaxBufferedTuples() != 0) {
            goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
        }
        if (pulsarServiceUrl != null) {
            goInstanceConfig.setPulsarServiceURL(pulsarServiceUrl);
        }
        if (instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
            goInstanceConfig.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
        }
        if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) {
            goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
        }
        if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) {
            for (String inputTopic : instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
                goInstanceConfig.setSourceSpecsTopic(inputTopic);
            }
        }
        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 0L) {
            goInstanceConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
        }
        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
            goInstanceConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
        }
        if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0.0) {
            goInstanceConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
        }
        if (instanceConfig.getFunctionDetails().getResources().getRam() != 0L) {
            goInstanceConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
        }
        if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0L) {
            goInstanceConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
        }
        if (instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() != null) {
            goInstanceConfig.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
        }
        if (instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries() != 0) {
            goInstanceConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
        }
        goInstanceConfig.setKillAfterIdleMs(0);
        ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
        String configContent = objectMapper.writeValueAsString((Object)goInstanceConfig);
        args.add(originalCodeFileName);
        args.add("-instance-conf");
        args.add(configContent);
        return args;
    }

    public static List<String> getCmd(InstanceConfig instanceConfig, String instanceFile, String extraDependenciesDir, String logDirectory, String originalCodeFileName, String pulsarServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, String shardId, Integer grpcPort, Long expectedHealthCheckInterval, String logConfigFile, String secretsProviderClassName, String secretsProviderConfig, Boolean installUserCodeDependencies, String pythonDependencyRepository, String pythonExtraDependencyRepository, int metricsPort) throws Exception {
        LinkedList<String> args = new LinkedList<String>();
        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
            return RuntimeUtils.getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl);
        }
        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
            Function.Resources resources;
            String functionInstanceClasspath;
            args.add("java");
            args.add("-cp");
            String classpath = instanceFile;
            if (StringUtils.isNotEmpty((CharSequence)extraDependenciesDir)) {
                classpath = classpath + ":" + extraDependenciesDir + "/*";
            }
            args.add(classpath);
            if (StringUtils.isNotEmpty((CharSequence)extraDependenciesDir)) {
                args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
            }
            if ((functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH)) == null) {
                log.warn("Property {} is not set.  Falling back to using classpath of current JVM", (Object)FUNCTIONS_INSTANCE_CLASSPATH);
                functionInstanceClasspath = System.getProperty("java.class.path");
            }
            args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClasspath));
            args.add("-Dlog4j.configurationFile=" + logConfigFile);
            args.add("-Dpulsar.function.log.dir=" + RuntimeUtils.genFunctionLogFolder(logDirectory, instanceConfig));
            args.add("-Dpulsar.function.log.file=" + String.format("%s-%s", instanceConfig.getFunctionDetails().getName(), shardId));
            if (!StringUtils.isEmpty((CharSequence)instanceConfig.getFunctionDetails().getRuntimeFlags())) {
                for (String runtimeFlagArg : RuntimeUtils.splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
                    args.add(runtimeFlagArg);
                }
            }
            if (instanceConfig.getFunctionDetails().getResources() != null && (resources = instanceConfig.getFunctionDetails().getResources()).getRam() != 0L) {
                args.add("-Xmx" + String.valueOf(resources.getRam()));
            }
            args.add("org.apache.pulsar.functions.instance.JavaInstanceMain");
            args.add("--jar");
            args.add(originalCodeFileName);
        } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
            args.add("python");
            if (!StringUtils.isEmpty((CharSequence)instanceConfig.getFunctionDetails().getRuntimeFlags())) {
                for (String runtimeFlagArg : RuntimeUtils.splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
                    args.add(runtimeFlagArg);
                }
            }
            args.add(instanceFile);
            args.add("--py");
            args.add(originalCodeFileName);
            args.add("--logging_directory");
            args.add(logDirectory);
            args.add("--logging_file");
            args.add(instanceConfig.getFunctionDetails().getName());
            args.add("--logging_config_file");
            args.add(logConfigFile);
            if (installUserCodeDependencies != null && installUserCodeDependencies.booleanValue()) {
                args.add("--install_usercode_dependencies");
                args.add("True");
            }
            if (!StringUtils.isEmpty((CharSequence)pythonDependencyRepository)) {
                args.add("--dependency_repository");
                args.add(pythonDependencyRepository);
            }
            if (!StringUtils.isEmpty((CharSequence)pythonExtraDependencyRepository)) {
                args.add("--extra_dependency_repository");
                args.add(pythonExtraDependencyRepository);
            }
        }
        args.add("--instance_id");
        args.add(shardId);
        args.add("--function_id");
        args.add(instanceConfig.getFunctionId());
        args.add("--function_version");
        args.add(instanceConfig.getFunctionVersion());
        args.add("--function_details");
        args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace().print((MessageOrBuilder)instanceConfig.getFunctionDetails()) + "'");
        args.add("--pulsar_serviceurl");
        args.add(pulsarServiceUrl);
        if (authConfig != null) {
            if (StringUtils.isNotBlank((CharSequence)authConfig.getClientAuthenticationPlugin()) && StringUtils.isNotBlank((CharSequence)authConfig.getClientAuthenticationParameters())) {
                args.add("--client_auth_plugin");
                args.add(authConfig.getClientAuthenticationPlugin());
                args.add("--client_auth_params");
                args.add(authConfig.getClientAuthenticationParameters());
            }
            args.add("--use_tls");
            args.add(Boolean.toString(authConfig.isUseTls()));
            args.add("--tls_allow_insecure");
            args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection()));
            args.add("--hostname_verification_enabled");
            args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable()));
            if (StringUtils.isNotBlank((CharSequence)authConfig.getTlsTrustCertsFilePath())) {
                args.add("--tls_trust_cert_path");
                args.add(authConfig.getTlsTrustCertsFilePath());
            }
        }
        args.add("--max_buffered_tuples");
        args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
        args.add("--port");
        args.add(String.valueOf(grpcPort));
        args.add("--metrics_port");
        args.add(String.valueOf(metricsPort));
        if (null != stateStorageServiceUrl) {
            args.add("--state_storage_serviceurl");
            args.add(stateStorageServiceUrl);
        }
        args.add("--expected_healthcheck_interval");
        args.add(String.valueOf(expectedHealthCheckInterval));
        if (!StringUtils.isEmpty((CharSequence)secretsProviderClassName)) {
            args.add("--secrets_provider");
            args.add(secretsProviderClassName);
            if (!StringUtils.isEmpty((CharSequence)secretsProviderConfig)) {
                args.add("--secrets_provider_config");
                args.add("'" + secretsProviderConfig + "'");
            }
        }
        args.add("--cluster_name");
        args.add(instanceConfig.getClusterName());
        return args;
    }

    public static String genFunctionLogFolder(String logDirectory, InstanceConfig instanceConfig) {
        return String.format("%s/%s", logDirectory, FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)instanceConfig.getFunctionDetails()));
    }

    public static String getPrometheusMetrics(int metricsPort) throws IOException {
        String line;
        StringBuilder result = new StringBuilder();
        URL url = new URL(String.format("http://%s:%s", InetAddress.getLocalHost().getHostAddress(), metricsPort));
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setRequestMethod("GET");
        BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
        while ((line = rd.readLine()) != null) {
            result.append(line + System.lineSeparator());
        }
        rd.close();
        return result.toString();
    }

    public static String[] splitRuntimeArgs(String input) {
        return input.split("\\s(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
    }
}

