package org.apache.pulsar.functions.runtime;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
import org.apache.pulsar.shade.com.beust.jcommander.JCommander;
import org.apache.pulsar.shade.com.beust.jcommander.Parameter;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.pulsar.shade.com.google.gson.GsonBuilder;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.shade.org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.io.SourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/LocalRunner.class */
public class LocalRunner {
    private static final Logger log = LoggerFactory.getLogger(LocalRunner.class);

    @Parameter(names = {"--functionConfig"}, description = "The json representation of FunctionConfig", hidden = true)
    protected String functionConfigString;

    @Parameter(names = {"--sourceConfig"}, description = "The json representation of SourceConfig", hidden = true)
    protected String sourceConfigString;

    @Parameter(names = {"--sinkConfig"}, description = "The json representation of SinkConfig", hidden = true)
    protected String sinkConfigString;

    @Parameter(names = {"--stateStorageServiceUrl"}, description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
    protected String stateStorageServiceUrl;

    @Parameter(names = {"--brokerServiceUrl"}, description = "The URL for the Pulsar broker", hidden = true)
    protected String brokerServiceUrl;

    @Parameter(names = {"--clientAuthPlugin"}, description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
    protected String clientAuthPlugin;

    @Parameter(names = {"--clientAuthParams"}, description = "Client authentication param", hidden = true)
    protected String clientAuthParams;

    @Parameter(names = {"--useTls"}, description = "Use tls connection\n", hidden = true, arity = 1)
    protected boolean useTls;

    @Parameter(names = {"--tlsAllowInsecureConnection"}, description = "Allow insecure tls connection\n", hidden = true, arity = 1)
    protected boolean tlsAllowInsecureConnection;

    @Parameter(names = {"--tlsHostNameVerificationEnabled"}, description = "Enable hostname verification", hidden = true, arity = 1)
    protected boolean tlsHostNameVerificationEnabled;

    @Parameter(names = {"--tlsTrustCertFilePath"}, description = "tls trust cert file path", hidden = true)
    protected String tlsTrustCertFilePath;

    @Parameter(names = {"--instanceIdOffset"}, description = "Start the instanceIds from this offset", hidden = true)
    protected Integer instanceIdOffset = 0;
    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";

    public static void main(String[] strArr) throws Exception {
        LocalRunner localRunner = new LocalRunner();
        JCommander jCommander = new JCommander(localRunner);
        jCommander.setProgramName("LocalRunner");
        jCommander.parse(strArr);
        localRunner.start();
    }

    void start() throws Exception {
        int intValue;
        String archive;
        Function.FunctionDetails convert;
        if (!StringUtils.isEmpty(this.functionConfigString)) {
            FunctionConfig functionConfig = (FunctionConfig) new Gson().fromJson(this.functionConfigString, FunctionConfig.class);
            FunctionConfigUtils.inferMissingArguments(functionConfig);
            ClassLoader classLoader = null;
            intValue = functionConfig.getParallelism().intValue();
            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                archive = functionConfig.getJar();
                if (Utils.isFunctionPackageUrlSupported(archive)) {
                    classLoader = org.apache.pulsar.functions.utils.Utils.extractClassLoader(archive);
                } else {
                    File file = new File(archive);
                    if (!file.exists()) {
                        throw new RuntimeException("User jar does not exist");
                    }
                    classLoader = org.apache.pulsar.functions.utils.Utils.loadJar(file);
                }
            } else {
                archive = functionConfig.getPy();
            }
            convert = FunctionConfigUtils.convert(functionConfig, classLoader);
        } else if (StringUtils.isEmpty(this.sourceConfigString)) {
            SinkConfig sinkConfig = (SinkConfig) new Gson().fromJson(this.sinkConfigString, SinkConfig.class);
            Utils.inferMissingArguments(sinkConfig);
            String isBuiltInSource = isBuiltInSource(sinkConfig.getArchive());
            if (isBuiltInSource != null) {
                sinkConfig.setArchive(isBuiltInSource);
            }
            intValue = sinkConfig.getParallelism().intValue();
            archive = sinkConfig.getArchive();
            if (Utils.isFunctionPackageUrlSupported(archive)) {
                convert = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, archive, null));
            } else {
                File file2 = new File(archive);
                if (!file2.exists()) {
                    throw new RuntimeException("Sink archive does not exist");
                }
                convert = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, null, file2));
            }
        } else {
            SourceConfig sourceConfig = (SourceConfig) new Gson().fromJson(this.sourceConfigString, SourceConfig.class);
            Utils.inferMissingArguments(sourceConfig);
            String isBuiltInSource2 = isBuiltInSource(sourceConfig.getArchive());
            if (isBuiltInSource2 != null) {
                sourceConfig.setArchive(isBuiltInSource2);
            }
            intValue = sourceConfig.getParallelism().intValue();
            archive = sourceConfig.getArchive();
            if (Utils.isFunctionPackageUrlSupported(archive)) {
                convert = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, archive, null));
            } else {
                File file3 = new File(archive);
                if (!file3.exists()) {
                    throw new RuntimeException("Source archive does not exist");
                }
                convert = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, null, file3));
            }
        }
        startLocalRun(convert, intValue, this.instanceIdOffset.intValue(), this.brokerServiceUrl, this.stateStorageServiceUrl, AuthenticationConfig.builder().clientAuthenticationPlugin(this.clientAuthPlugin).clientAuthenticationParameters(this.clientAuthParams).useTls(this.useTls).tlsAllowInsecureConnection(this.tlsAllowInsecureConnection).tlsHostnameVerificationEnable(this.tlsHostNameVerificationEnabled).tlsTrustCertsFilePath(this.tlsTrustCertFilePath).build(), archive);
    }

    protected static void startLocalRun(Function.FunctionDetails functionDetails, int i, int i2, String str, String str2, AuthenticationConfig authenticationConfig, String str3) throws Exception {
        String str4 = DEFAULT_SERVICE_URL;
        if (str != null) {
            str4 = str;
        }
        ProcessRuntimeFactory processRuntimeFactory = new ProcessRuntimeFactory(str4, str2, authenticationConfig, null, null, null, null, new DefaultSecretsProviderConfigurator());
        Throwable th = null;
        try {
            try {
                final LinkedList<RuntimeSpawner> linkedList = new LinkedList();
                for (int i3 = 0; i3 < i; i3++) {
                    InstanceConfig instanceConfig = new InstanceConfig();
                    instanceConfig.setFunctionDetails(functionDetails);
                    instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
                    instanceConfig.setFunctionId(UUID.randomUUID().toString());
                    instanceConfig.setInstanceId(i3 + i2);
                    instanceConfig.setMaxBufferedTuples(1024);
                    instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
                    instanceConfig.setClusterName("local");
                    RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, str3, null, processRuntimeFactory, 30000L);
                    linkedList.add(runtimeSpawner);
                    runtimeSpawner.start();
                }
                java.lang.Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.pulsar.functions.runtime.LocalRunner.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        LocalRunner.log.info("Shutting down the localrun runtimeSpawner ...");
                        Iterator it = linkedList.iterator();
                        while (it.hasNext()) {
                            ((RuntimeSpawner) it.next()).close();
                        }
                    }
                });
                final Timer timer = new Timer();
                timer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.pulsar.functions.runtime.LocalRunner.2
                    @Override // java.util.TimerTask, java.lang.Runnable
                    public void run() {
                        CompletableFuture[] completableFutureArr = new CompletableFuture[linkedList.size()];
                        int i4 = 0;
                        Iterator it = linkedList.iterator();
                        while (it.hasNext()) {
                            completableFutureArr[i4] = ((RuntimeSpawner) it.next()).getFunctionStatusAsJson(i4);
                            i4++;
                        }
                        try {
                            CompletableFuture.allOf(completableFutureArr).get(5L, TimeUnit.SECONDS);
                            for (CompletableFuture completableFuture : completableFutureArr) {
                                LocalRunner.log.info(new GsonBuilder().setPrettyPrinting().create().toJson(new JsonParser().parse((String) completableFuture.get())));
                            }
                        } catch (Exception e) {
                            LocalRunner.log.error("Could not get status from all local instances");
                        }
                    }
                }, 30000L, 30000L);
                java.lang.Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.pulsar.functions.runtime.LocalRunner.3
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        timer.cancel();
                    }
                });
                for (RuntimeSpawner runtimeSpawner2 : linkedList) {
                    runtimeSpawner2.join();
                    log.info("RuntimeSpawner quit because of", runtimeSpawner2.getRuntime().getDeathException());
                }
                if (processRuntimeFactory != null) {
                    if (0 == 0) {
                        processRuntimeFactory.close();
                        return;
                    }
                    try {
                        processRuntimeFactory.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (processRuntimeFactory != null) {
                if (th != null) {
                    try {
                        processRuntimeFactory.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    processRuntimeFactory.close();
                }
            }
            throw th4;
        }
    }

    private String isBuiltInSource(String str) throws IOException {
        Connectors connectors = getConnectors();
        if (connectors.getSources().containsKey(str)) {
            return connectors.getSources().get(str).toString();
        }
        return null;
    }

    private String isBuiltInSink(String str) throws IOException {
        Connectors connectors = getConnectors();
        if (connectors.getSinks().containsKey(str)) {
            return connectors.getSinks().get(str).toString();
        }
        return null;
    }

    private Connectors getConnectors() throws IOException {
        String str = System.getenv("PULSAR_HOME");
        if (str == null) {
            str = Paths.get("", new String[0]).toAbsolutePath().toString();
        }
        return ConnectorUtils.searchForConnectors(Paths.get(str, "connectors").toString());
    }
}
