package org.apache.reef.runtime.mesos.evaluator;

import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.MesosExecutorDriver;
import org.apache.mesos.Protos;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.mesos.evaluator.parameters.MesosExecutorId;
import org.apache.reef.runtime.mesos.util.EvaluatorControl;
import org.apache.reef.runtime.mesos.util.EvaluatorLaunch;
import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
import org.apache.reef.runtime.mesos.util.MesosRemoteManager;
import org.apache.reef.tang.JavaConfigurationBuilder;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.CommandLine;

/* loaded from: input_file:org/apache/reef/runtime/mesos/evaluator/REEFExecutor.class */
public final class REEFExecutor implements Executor {
    private static final Logger LOG;
    private final MesosExecutorDriver mesosExecutorDriver;
    private final MesosRemoteManager mesosRemoteManager;
    private final ExecutorService executorService;
    private final REEFFileNames fileNames;
    private final String mesosExecutorId;
    private Process evaluatorProcess;
    private Integer evaluatorProcessExitValue;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Inject
    REEFExecutor(EvaluatorControlHandler evaluatorControlHandler, MesosRemoteManager mesosRemoteManager, REEFFileNames rEEFFileNames, @Parameter(MesosExecutorId.class) String str) {
        this.mesosRemoteManager = mesosRemoteManager;
        this.mesosRemoteManager.registerHandler(EvaluatorControl.class, evaluatorControlHandler);
        this.mesosExecutorDriver = new MesosExecutorDriver(this);
        this.executorService = Executors.newCachedThreadPool();
        this.fileNames = rEEFFileNames;
        this.mesosExecutorId = str;
    }

    @Override // org.apache.mesos.Executor
    public void registered(ExecutorDriver executorDriver, Protos.ExecutorInfo executorInfo, Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) {
        LOG.log(Level.FINEST, "Executor registered. driver: {0} executorInfo: {1} frameworkInfo: {2} slaveInfo {3}", new Object[]{executorDriver, executorInfo, frameworkInfo, slaveInfo});
    }

    @Override // org.apache.mesos.Executor
    public void reregistered(ExecutorDriver executorDriver, Protos.SlaveInfo slaveInfo) {
        LOG.log(Level.FINEST, "Executor reregistered. driver: {0}", executorDriver);
    }

    @Override // org.apache.mesos.Executor
    public void disconnected(ExecutorDriver executorDriver) {
        onRuntimeError();
    }

    @Override // org.apache.mesos.Executor
    public void launchTask(ExecutorDriver executorDriver, Protos.TaskInfo taskInfo) {
        executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(this.mesosExecutorId).build()).setState(Protos.TaskState.TASK_STARTING).setSlaveId(taskInfo.getSlaveId()).setMessage(this.mesosRemoteManager.getMyIdentifier()).build());
    }

    @Override // org.apache.mesos.Executor
    public void killTask(ExecutorDriver executorDriver, Protos.TaskID taskID) {
        onStop();
    }

    @Override // org.apache.mesos.Executor
    public void frameworkMessage(ExecutorDriver executorDriver, byte[] bArr) {
        LOG.log(Level.FINEST, "Framework Messge. ExecutorDriver: {0}, data: {1}.", new Object[]{executorDriver, bArr});
    }

    @Override // org.apache.mesos.Executor
    public void shutdown(ExecutorDriver executorDriver) {
        onStop();
    }

    @Override // org.apache.mesos.Executor
    public void error(ExecutorDriver executorDriver, String str) {
        onRuntimeError();
    }

    private void onStart() {
        this.executorService.submit(new Thread() { // from class: org.apache.reef.runtime.mesos.evaluator.REEFExecutor.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                REEFExecutor.LOG.log(Level.INFO, "MesosExecutorDriver ended with status {0}", REEFExecutor.this.mesosExecutorDriver.run());
            }
        });
    }

    private void onStop() {
        if (this.evaluatorProcess != null) {
            this.evaluatorProcess.destroy();
            this.mesosExecutorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(this.mesosExecutorId).build()).setState(Protos.TaskState.TASK_FINISHED).setMessage("Evaluator Process exited with status " + String.valueOf(this.evaluatorProcessExitValue)).build());
        } else {
            this.mesosExecutorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(this.mesosExecutorId).build()).setState(Protos.TaskState.TASK_FINISHED).setData(ByteString.copyFromUtf8("eval_not_run")).setMessage("Evaluator Process exited with status " + String.valueOf(this.evaluatorProcessExitValue)).build());
        }
        this.executorService.shutdown();
        this.mesosExecutorDriver.stop();
    }

    private void onRuntimeError() {
        if (this.evaluatorProcess != null) {
            this.evaluatorProcess.destroy();
        }
        this.mesosExecutorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(this.mesosExecutorId).build()).setState(Protos.TaskState.TASK_FAILED).setMessage("Evaluator Process exited with status " + String.valueOf(this.evaluatorProcessExitValue)).build());
        this.executorService.shutdown();
        this.mesosExecutorDriver.stop();
    }

    public void onEvaluatorRelease(EvaluatorRelease evaluatorRelease) {
        LOG.log(Level.INFO, "Release!!!! {0}", evaluatorRelease.toString());
        if (!$assertionsDisabled && !evaluatorRelease.getIdentifier().toString().equals(this.mesosExecutorId)) {
            throw new AssertionError();
        }
        onStop();
    }

    public void onEvaluatorLaunch(final EvaluatorLaunch evaluatorLaunch) {
        LOG.log(Level.INFO, "Launch!!!! {0}", evaluatorLaunch.toString());
        if (!$assertionsDisabled && !evaluatorLaunch.getIdentifier().toString().equals(this.mesosExecutorId)) {
            throw new AssertionError();
        }
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.submit(new Thread() { // from class: org.apache.reef.runtime.mesos.evaluator.REEFExecutor.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    List<String> asList = Arrays.asList(evaluatorLaunch.getCommand().toString().split(" "));
                    REEFExecutor.LOG.log(Level.INFO, "Command!!!! {0}", asList);
                    FileSystem fileSystem = FileSystem.get(new Configuration());
                    FileUtil.copy(fileSystem, new Path(fileSystem.getUri() + "/" + REEFExecutor.this.mesosExecutorId), new File(REEFExecutor.this.fileNames.getREEFFolderName(), REEFExecutor.this.fileNames.getLocalFolderName()), true, new Configuration());
                    REEFExecutor.this.evaluatorProcess = new ProcessBuilder(new String[0]).command(asList).redirectError(new File(REEFExecutor.this.fileNames.getEvaluatorStderrFileName())).redirectOutput(new File(REEFExecutor.this.fileNames.getEvaluatorStdoutFileName())).start();
                    REEFExecutor.this.evaluatorProcessExitValue = Integer.valueOf(REEFExecutor.this.evaluatorProcess.waitFor());
                    fileSystem.close();
                } catch (IOException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        newSingleThreadExecutor.shutdown();
    }

    public static org.apache.reef.tang.Configuration parseCommandLine(String[] strArr) throws IOException {
        JavaConfigurationBuilder newConfigurationBuilder = Tang.Factory.getTang().newConfigurationBuilder();
        new CommandLine(newConfigurationBuilder).registerShortNameOfClass(MesosExecutorId.class).processCommandLine(strArr, new Class[0]);
        return newConfigurationBuilder.build();
    }

    public static void main(String[] strArr) throws Exception {
        ((REEFExecutor) Tang.Factory.getTang().newInjector(parseCommandLine(strArr)).getInstance(REEFExecutor.class)).onStart();
    }

    static {
        $assertionsDisabled = !REEFExecutor.class.desiredAssertionStatus();
        LOG = Logger.getLogger(REEFExecutor.class.getName());
    }
}
