/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.protobuf.Empty;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ProcessRuntime
implements Runtime {
    private static final Logger log = LoggerFactory.getLogger(ProcessRuntime.class);
    private Process process;
    private List<String> processArgs;
    private int instancePort;
    private int metricsPort;
    private Throwable deathException;
    private ManagedChannel channel;
    private InstanceControlGrpc.InstanceControlFutureStub stub;
    private ScheduledFuture timer;
    private InstanceConfig instanceConfig;
    private final Long expectedHealthCheckInterval;
    private final SecretsProviderConfigurator secretsProviderConfigurator;
    private final String extraDependenciesDir;
    private static final long GRPC_TIMEOUT_SECS = 5L;
    private final String funcLogDir;

    ProcessRuntime(InstanceConfig instanceConfig, String instanceFile, String extraDependenciesDir, String logDirectory, String codeFile, String pulsarServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig, SecretsProviderConfigurator secretsProviderConfigurator, Long expectedHealthCheckInterval) throws Exception {
        this.instanceConfig = instanceConfig;
        this.instancePort = instanceConfig.getPort();
        this.metricsPort = Utils.findAvailablePort();
        this.expectedHealthCheckInterval = expectedHealthCheckInterval;
        this.secretsProviderConfigurator = secretsProviderConfigurator;
        this.funcLogDir = RuntimeUtils.genFunctionLogFolder(logDirectory, instanceConfig);
        String logConfigFile = null;
        String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
        String secretsProviderConfig = null;
        if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) {
            secretsProviderConfig = new Gson().toJson((Object)secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
        }
        switch (instanceConfig.getFunctionDetails().getRuntime()) {
            case JAVA: {
                logConfigFile = "java_instance_log4j2.yml";
                break;
            }
            case PYTHON: {
                logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
            }
        }
        this.extraDependenciesDir = extraDependenciesDir;
        this.processArgs = RuntimeUtils.composeCmd(instanceConfig, instanceFile, Function.FunctionDetails.Runtime.JAVA == instanceConfig.getFunctionDetails().getRuntime() ? extraDependenciesDir : null, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl, authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval, logConfigFile, secretsProviderClassName, secretsProviderConfig, false, null, null, this.metricsPort);
    }

    @Override
    public void start() {
        java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> this.process.destroy()));
        log.info("Creating function log directory {}", (Object)this.funcLogDir);
        try {
            Files.createDirectories(Paths.get(this.funcLogDir, new String[0]), new FileAttribute[0]);
        }
        catch (IOException e) {
            log.info("Exception when creating log folder : {}", (Object)this.funcLogDir, (Object)e);
            throw new RuntimeException("Log folder creation error");
        }
        log.info("Created or found function log directory {}", (Object)this.funcLogDir);
        this.startProcess();
        if (this.channel == null && this.stub == null) {
            this.channel = ManagedChannelBuilder.forAddress((String)"127.0.0.1", (int)this.instancePort).usePlaintext(true).build();
            this.stub = InstanceControlGrpc.newFutureStub((Channel)this.channel);
            this.timer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
                CompletableFuture<InstanceCommunication.HealthCheckResult> result = this.healthCheck();
                try {
                    result.get();
                }
                catch (Exception e) {
                    log.error("Health check failed for {}-{}", new Object[]{this.instanceConfig.getFunctionDetails().getName(), this.instanceConfig.getInstanceId(), e});
                }
            }, this.expectedHealthCheckInterval, this.expectedHealthCheckInterval, TimeUnit.SECONDS);
        }
    }

    @Override
    public void join() throws Exception {
        this.process.waitFor();
    }

    @Override
    public void stop() throws InterruptedException {
        if (this.timer != null) {
            this.timer.cancel(false);
        }
        if (this.channel != null) {
            this.channel.shutdown();
        }
        this.channel = null;
        this.stub = null;
        if (this.process != null) {
            this.process.destroy();
            int i = 0;
            while (this.process.isAlive()) {
                Thread.sleep(100L);
                if (i > 100) break;
                ++i;
            }
            if (this.process.isAlive()) {
                log.warn("Process for instance {} did not exit within timeout. Forcibly killing process...", (Object)Utils.getFullyQualifiedInstanceId((String)this.instanceConfig.getFunctionDetails().getTenant(), (String)this.instanceConfig.getFunctionDetails().getNamespace(), (String)this.instanceConfig.getFunctionDetails().getName(), (int)this.instanceConfig.getInstanceId()));
                this.process.destroyForcibly();
            }
        }
    }

    @Override
    public CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int instanceId) {
        final CompletableFuture<InstanceCommunication.FunctionStatus> retval = new CompletableFuture<InstanceCommunication.FunctionStatus>();
        if (this.stub == null) {
            retval.completeExceptionally(new RuntimeException("Not alive"));
            return retval;
        }
        ListenableFuture response = ((InstanceControlGrpc.InstanceControlFutureStub)this.stub.withDeadlineAfter(5L, TimeUnit.SECONDS)).getFunctionStatus(Empty.newBuilder().build());
        Futures.addCallback((ListenableFuture)response, (FutureCallback)new FutureCallback<InstanceCommunication.FunctionStatus>(){

            public void onFailure(Throwable throwable) {
                InstanceCommunication.FunctionStatus.Builder builder = InstanceCommunication.FunctionStatus.newBuilder();
                builder.setRunning(false);
                if (ProcessRuntime.this.deathException != null) {
                    builder.setFailureException(ProcessRuntime.this.deathException.getMessage());
                } else {
                    builder.setFailureException(throwable.getMessage());
                }
                retval.complete(builder.build());
            }

            public void onSuccess(InstanceCommunication.FunctionStatus t) {
                retval.complete(t);
            }
        });
        return retval;
    }

    @Override
    public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
        final CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<InstanceCommunication.MetricsData>();
        if (this.stub == null) {
            retval.completeExceptionally(new RuntimeException("Not alive"));
            return retval;
        }
        ListenableFuture response = ((InstanceControlGrpc.InstanceControlFutureStub)this.stub.withDeadlineAfter(5L, TimeUnit.SECONDS)).getAndResetMetrics(Empty.newBuilder().build());
        Futures.addCallback((ListenableFuture)response, (FutureCallback)new FutureCallback<InstanceCommunication.MetricsData>(){

            public void onFailure(Throwable throwable) {
                retval.completeExceptionally(throwable);
            }

            public void onSuccess(InstanceCommunication.MetricsData t) {
                retval.complete(t);
            }
        });
        return retval;
    }

    @Override
    public CompletableFuture<Void> resetMetrics() {
        final CompletableFuture<Void> retval = new CompletableFuture<Void>();
        if (this.stub == null) {
            retval.completeExceptionally(new RuntimeException("Not alive"));
            return retval;
        }
        ListenableFuture response = ((InstanceControlGrpc.InstanceControlFutureStub)this.stub.withDeadlineAfter(5L, TimeUnit.SECONDS)).resetMetrics(Empty.newBuilder().build());
        Futures.addCallback((ListenableFuture)response, (FutureCallback)new FutureCallback<Empty>(){

            public void onFailure(Throwable throwable) {
                retval.completeExceptionally(throwable);
            }

            public void onSuccess(Empty t) {
                retval.complete(null);
            }
        });
        return retval;
    }

    @Override
    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
        final CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<InstanceCommunication.MetricsData>();
        if (this.stub == null) {
            retval.completeExceptionally(new RuntimeException("Not alive"));
            return retval;
        }
        ListenableFuture response = ((InstanceControlGrpc.InstanceControlFutureStub)this.stub.withDeadlineAfter(5L, TimeUnit.SECONDS)).getMetrics(Empty.newBuilder().build());
        Futures.addCallback((ListenableFuture)response, (FutureCallback)new FutureCallback<InstanceCommunication.MetricsData>(){

            public void onFailure(Throwable throwable) {
                retval.completeExceptionally(throwable);
            }

            public void onSuccess(InstanceCommunication.MetricsData t) {
                retval.complete(t);
            }
        });
        return retval;
    }

    @Override
    public String getPrometheusMetrics() throws IOException {
        return RuntimeUtils.getPrometheusMetrics(this.metricsPort);
    }

    public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck() {
        final CompletableFuture<InstanceCommunication.HealthCheckResult> retval = new CompletableFuture<InstanceCommunication.HealthCheckResult>();
        if (this.stub == null) {
            retval.completeExceptionally(new RuntimeException("Not alive"));
            return retval;
        }
        ListenableFuture response = ((InstanceControlGrpc.InstanceControlFutureStub)this.stub.withDeadlineAfter(5L, TimeUnit.SECONDS)).healthCheck(Empty.newBuilder().build());
        Futures.addCallback((ListenableFuture)response, (FutureCallback)new FutureCallback<InstanceCommunication.HealthCheckResult>(){

            public void onFailure(Throwable throwable) {
                retval.completeExceptionally(throwable);
            }

            public void onSuccess(InstanceCommunication.HealthCheckResult t) {
                retval.complete(t);
            }
        });
        return retval;
    }

    private void startProcess() {
        this.deathException = null;
        try {
            ProcessBuilder processBuilder = new ProcessBuilder(this.processArgs).inheritIO();
            if (StringUtils.isNotEmpty((CharSequence)this.extraDependenciesDir)) {
                processBuilder.environment().put("PYTHONPATH", "${PYTHONPATH}:" + this.extraDependenciesDir);
            }
            this.secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder, this.instanceConfig.getFunctionDetails());
            log.info("ProcessBuilder starting the process with args {}", (Object)String.join((CharSequence)" ", processBuilder.command()));
            this.process = processBuilder.start();
        }
        catch (Exception ex) {
            log.error("Starting process failed", (Throwable)ex);
            this.deathException = ex;
            return;
        }
        try {
            int exitValue = this.process.exitValue();
            log.error("Instance Process quit unexpectedly with return value " + exitValue);
            this.tryExtractingDeathException();
        }
        catch (IllegalThreadStateException ex) {
            log.info("Started process successfully");
        }
    }

    @Override
    public boolean isAlive() {
        if (this.process == null) {
            return false;
        }
        if (!this.process.isAlive()) {
            if (this.deathException == null) {
                this.tryExtractingDeathException();
            }
            return false;
        }
        return true;
    }

    private void tryExtractingDeathException() {
        InputStream errorStream = this.process.getErrorStream();
        try {
            byte[] errorBytes = new byte[errorStream.available()];
            errorStream.read(errorBytes);
            String errorMessage = new String(errorBytes);
            this.deathException = new RuntimeException(errorMessage);
            log.error("Extracted Process death exception", this.deathException);
        }
        catch (Exception ex) {
            this.deathException = ex;
            log.error("Error extracting Process death exception", this.deathException);
        }
    }

    public Process getProcess() {
        return this.process;
    }

    public List<String> getProcessArgs() {
        return this.processArgs;
    }

    @Override
    public Throwable getDeathException() {
        return this.deathException;
    }
}

