/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.utils.io.Connectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalRunner {
    private static final Logger log = LoggerFactory.getLogger(LocalRunner.class);
    @Parameter(names={"--functionConfig"}, description="The json representation of FunctionConfig", hidden=true)
    protected String functionConfigString;
    @Parameter(names={"--sourceConfig"}, description="The json representation of SourceConfig", hidden=true)
    protected String sourceConfigString;
    @Parameter(names={"--sinkConfig"}, description="The json representation of SinkConfig", hidden=true)
    protected String sinkConfigString;
    @Parameter(names={"--stateStorageServiceUrl"}, description="The URL for the state storage service (by default Apache BookKeeper)", hidden=true)
    protected String stateStorageServiceUrl;
    @Parameter(names={"--brokerServiceUrl"}, description="The URL for the Pulsar broker", hidden=true)
    protected String brokerServiceUrl;
    @Parameter(names={"--clientAuthPlugin"}, description="Client authentication plugin using which function-process can connect to broker", hidden=true)
    protected String clientAuthPlugin;
    @Parameter(names={"--clientAuthParams"}, description="Client authentication param", hidden=true)
    protected String clientAuthParams;
    @Parameter(names={"--useTls"}, description="Use tls connection\n", hidden=true, arity=1)
    protected boolean useTls;
    @Parameter(names={"--tlsAllowInsecureConnection"}, description="Allow insecure tls connection\n", hidden=true, arity=1)
    protected boolean tlsAllowInsecureConnection;
    @Parameter(names={"--tlsHostNameVerificationEnabled"}, description="Enable hostname verification", hidden=true, arity=1)
    protected boolean tlsHostNameVerificationEnabled;
    @Parameter(names={"--tlsTrustCertFilePath"}, description="tls trust cert file path", hidden=true)
    protected String tlsTrustCertFilePath;
    @Parameter(names={"--instanceIdOffset"}, description="Start the instanceIds from this offset", hidden=true)
    protected Integer instanceIdOffset = 0;
    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";

    public static void main(String[] args) throws Exception {
        LocalRunner localRunner = new LocalRunner();
        JCommander jcommander = new JCommander((Object)localRunner);
        jcommander.setProgramName("LocalRunner");
        jcommander.parse(args);
        localRunner.start();
    }

    void start() throws Exception {
        Function.FunctionDetails functionDetails;
        String userCodeFile;
        int parallelism;
        if (!StringUtils.isEmpty((CharSequence)this.functionConfigString)) {
            FunctionConfig functionConfig = (FunctionConfig)new Gson().fromJson(this.functionConfigString, FunctionConfig.class);
            FunctionConfigUtils.inferMissingArguments((FunctionConfig)functionConfig);
            ClassLoader classLoader = null;
            parallelism = functionConfig.getParallelism();
            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                userCodeFile = functionConfig.getJar();
                if (Utils.isFunctionPackageUrlSupported((String)userCodeFile)) {
                    classLoader = org.apache.pulsar.functions.utils.Utils.extractClassLoader((String)userCodeFile);
                } else {
                    File file = new File(userCodeFile);
                    if (!file.exists()) {
                        throw new RuntimeException("User jar does not exist");
                    }
                    classLoader = org.apache.pulsar.functions.utils.Utils.loadJar((File)file);
                }
            } else {
                userCodeFile = functionConfig.getPy();
            }
            functionDetails = FunctionConfigUtils.convert((FunctionConfig)functionConfig, classLoader);
        } else if (!StringUtils.isEmpty((CharSequence)this.sourceConfigString)) {
            SourceConfig sourceConfig = (SourceConfig)new Gson().fromJson(this.sourceConfigString, SourceConfig.class);
            Utils.inferMissingArguments((SourceConfig)sourceConfig);
            String builtInSource = this.isBuiltInSource(sourceConfig.getArchive());
            if (builtInSource != null) {
                sourceConfig.setArchive(builtInSource);
            }
            parallelism = sourceConfig.getParallelism();
            userCodeFile = sourceConfig.getArchive();
            if (Utils.isFunctionPackageUrlSupported((String)userCodeFile)) {
                functionDetails = SourceConfigUtils.convert((SourceConfig)sourceConfig, (SourceConfigUtils.ExtractedSourceDetails)SourceConfigUtils.validate((SourceConfig)sourceConfig, null, (String)userCodeFile, null));
            } else {
                File file = new File(userCodeFile);
                if (!file.exists()) {
                    throw new RuntimeException("Source archive does not exist");
                }
                functionDetails = SourceConfigUtils.convert((SourceConfig)sourceConfig, (SourceConfigUtils.ExtractedSourceDetails)SourceConfigUtils.validate((SourceConfig)sourceConfig, null, null, (File)file));
            }
        } else {
            SinkConfig sinkConfig = (SinkConfig)new Gson().fromJson(this.sinkConfigString, SinkConfig.class);
            Utils.inferMissingArguments((SinkConfig)sinkConfig);
            String builtInSink = this.isBuiltInSource(sinkConfig.getArchive());
            if (builtInSink != null) {
                sinkConfig.setArchive(builtInSink);
            }
            parallelism = sinkConfig.getParallelism();
            userCodeFile = sinkConfig.getArchive();
            if (Utils.isFunctionPackageUrlSupported((String)userCodeFile)) {
                functionDetails = SinkConfigUtils.convert((SinkConfig)sinkConfig, (SinkConfigUtils.ExtractedSinkDetails)SinkConfigUtils.validate((SinkConfig)sinkConfig, null, (String)userCodeFile, null));
            } else {
                File file = new File(userCodeFile);
                if (!file.exists()) {
                    throw new RuntimeException("Sink archive does not exist");
                }
                functionDetails = SinkConfigUtils.convert((SinkConfig)sinkConfig, (SinkConfigUtils.ExtractedSinkDetails)SinkConfigUtils.validate((SinkConfig)sinkConfig, null, null, (File)file));
            }
        }
        LocalRunner.startLocalRun(functionDetails, parallelism, this.instanceIdOffset, this.brokerServiceUrl, this.stateStorageServiceUrl, AuthenticationConfig.builder().clientAuthenticationPlugin(this.clientAuthPlugin).clientAuthenticationParameters(this.clientAuthParams).useTls(this.useTls).tlsAllowInsecureConnection(this.tlsAllowInsecureConnection).tlsHostnameVerificationEnable(this.tlsHostNameVerificationEnabled).tlsTrustCertsFilePath(this.tlsTrustCertFilePath).build(), userCodeFile);
    }

    protected static void startLocalRun(Function.FunctionDetails functionDetails, int parallelism, int instanceIdOffset, String brokerServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, String userCodeFile) throws Exception {
        String serviceUrl = DEFAULT_SERVICE_URL;
        if (brokerServiceUrl != null) {
            serviceUrl = brokerServiceUrl;
        }
        try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null, null, null, (SecretsProviderConfigurator)new DefaultSecretsProviderConfigurator());){
            final LinkedList<RuntimeSpawner> spawners = new LinkedList<RuntimeSpawner>();
            for (int i = 0; i < parallelism; ++i) {
                InstanceConfig instanceConfig = new InstanceConfig();
                instanceConfig.setFunctionDetails(functionDetails);
                instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
                instanceConfig.setFunctionId(UUID.randomUUID().toString());
                instanceConfig.setInstanceId(i + instanceIdOffset);
                instanceConfig.setMaxBufferedTuples(1024);
                instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
                instanceConfig.setClusterName("local");
                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, userCodeFile, null, containerFactory, 30000L);
                spawners.add(runtimeSpawner);
                runtimeSpawner.start();
            }
            Runtime.getRuntime().addShutdownHook(new Thread(){

                @Override
                public void run() {
                    log.info("Shutting down the localrun runtimeSpawner ...");
                    for (RuntimeSpawner spawner : spawners) {
                        spawner.close();
                    }
                }
            });
            final Timer statusCheckTimer = new Timer();
            statusCheckTimer.scheduleAtFixedRate(new TimerTask(){

                @Override
                public void run() {
                    CompletableFuture[] futures = new CompletableFuture[spawners.size()];
                    int index = 0;
                    for (RuntimeSpawner spawner : spawners) {
                        futures[index] = spawner.getFunctionStatusAsJson(index);
                        ++index;
                    }
                    try {
                        CompletableFuture.allOf(futures).get(5L, TimeUnit.SECONDS);
                        for (index = 0; index < futures.length; ++index) {
                            String json = (String)futures[index].get();
                            Gson gson = new GsonBuilder().setPrettyPrinting().create();
                            log.info(gson.toJson(new JsonParser().parse(json)));
                        }
                    }
                    catch (Exception ex) {
                        log.error("Could not get status from all local instances");
                    }
                }
            }, 30000L, 30000L);
            Runtime.getRuntime().addShutdownHook(new Thread(){

                @Override
                public void run() {
                    statusCheckTimer.cancel();
                }
            });
            for (RuntimeSpawner spawner : spawners) {
                spawner.join();
                log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException());
            }
        }
    }

    private String isBuiltInSource(String sourceType) throws IOException {
        Connectors connectors = this.getConnectors();
        if (connectors.getSources().containsKey(sourceType)) {
            return ((Path)connectors.getSources().get(sourceType)).toString();
        }
        return null;
    }

    private String isBuiltInSink(String sinkType) throws IOException {
        Connectors connectors = this.getConnectors();
        if (connectors.getSinks().containsKey(sinkType)) {
            return ((Path)connectors.getSinks().get(sinkType)).toString();
        }
        return null;
    }

    private Connectors getConnectors() throws IOException {
        String pulsarHome = System.getenv("PULSAR_HOME");
        if (pulsarHome == null) {
            pulsarHome = Paths.get("", new String[0]).toAbsolutePath().toString();
        }
        String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
        return ConnectorUtils.searchForConnectors((String)connectorsDir);
    }
}

