/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.runner;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.common.utils.ProcessUtil;
import org.apache.geaflow.stats.collector.StatsCollectorFactory;
import org.apache.geaflow.stats.model.EventLabel;
import org.apache.geaflow.stats.model.ExceptionLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(CommandRunner.class);
    private int pid;
    private Process process;
    private final String command;
    private final int maxRestarts;
    private final Map<String, String> env;
    private final Configuration configuration;
    private final int exitWaitSecs;

    public CommandRunner(String command, int maxRestarts, Map<String, String> env, Configuration config) {
        this.command = command;
        this.maxRestarts = maxRestarts;
        this.env = env;
        this.configuration = config;
        this.exitWaitSecs = config.getInteger(ExecutionConfigKeys.PROCESS_EXIT_WAIT_SECONDS);
    }

    public void asyncStart() {
        CompletableFuture.runAsync(() -> {
            try {
                this.startProcess();
            }
            catch (Throwable e) {
                LOGGER.error("Start process failed: {}", (Object)e.getMessage(), (Object)e);
                String errMsg = String.format("Worker process exited: %s", e.getMessage());
                StatsCollectorFactory.init((Configuration)this.configuration).getEventCollector().reportEvent(ExceptionLevel.ERROR, EventLabel.WORKER_PROCESS_EXITED, errMsg);
            }
        });
    }

    public void startProcess() {
        try {
            int restarts = this.maxRestarts;
            while (true) {
                Process childProcess = this.doStartProcess(this.command);
                int code = childProcess.waitFor();
                LOGGER.warn("Child process {} exits with code: {} and alive: {}", new Object[]{this.pid, code, childProcess.isAlive()});
                if (code == 0 || code == 137 || code == 143) {
                    return;
                }
                if (restarts == 0) {
                    String errMsg = this.maxRestarts == 0 ? String.format("process exits code: %s", code) : String.format("process exits code: %s, exhausted %s restarts", code, this.maxRestarts);
                    throw new GeaflowRuntimeException(errMsg);
                }
                --restarts;
            }
        }
        catch (GeaflowRuntimeException e) {
            LOGGER.error("FATAL: start command failed: {}", (Object)this.command, (Object)e);
            throw e;
        }
        catch (Throwable e) {
            LOGGER.error("FATAL: start command failed: {}", (Object)this.command, (Object)e);
            throw new GeaflowRuntimeException(e.getMessage(), e);
        }
    }

    private Process doStartProcess(String startCommand) throws IOException {
        Process childProcess;
        LOGGER.info("Start process with command: {}", (Object)startCommand);
        ProcessBuilder pb = new ProcessBuilder(new String[0]);
        pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
        if (this.env != null) {
            pb.environment().putAll(this.env);
        }
        String[] cmds = startCommand.split("\\s+");
        pb.command(cmds);
        this.process = childProcess = pb.start();
        this.pid = ProcessUtil.getProcessPid((Process)childProcess);
        LOGGER.info("Process started with pid: {}", (Object)this.pid);
        return childProcess;
    }

    public Process getProcess() {
        return this.process;
    }

    public int getProcessId() {
        return this.pid;
    }

    public void stop() {
        this.stop(this.pid);
    }

    public void stop(int oldPid) {
        Preconditions.checkArgument((this.pid > 0 ? 1 : 0) != 0, (Object)"pid should be larger than 0");
        LOGGER.info("Stop old process if exists: {}", (Object)oldPid);
        Process curProcess = this.process;
        int curPid = this.pid;
        if (curProcess.isAlive()) {
            if (curPid <= 0) {
                LOGGER.warn("Process is alive but pid not found: {}", (Object)curProcess);
                return;
            }
            curProcess.destroy();
            try {
                boolean status = curProcess.waitFor(this.exitWaitSecs, TimeUnit.SECONDS);
                LOGGER.info("Destroy current process {}: {}", (Object)curPid, (Object)status);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Interrupted while waiting for process to exit: {}", (Object)this.pid);
            }
        }
        if (curPid != oldPid) {
            LOGGER.info("Kill old process: {}", (Object)oldPid);
            ProcessUtil.killProcess((int)oldPid);
        }
    }
}

