package org.apache.reef.runtime.mesos.driver;

import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
import javax.inject.Inject;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.mesos.MesosSchedulerDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
import org.apache.reef.runtime.common.driver.parameters.JobIdentifier;
import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.mesos.driver.parameters.JobSubmissionDirectoryPrefix;
import org.apache.reef.runtime.mesos.driver.parameters.MesosMasterIp;
import org.apache.reef.runtime.mesos.driver.parameters.MesosSlavePort;
import org.apache.reef.runtime.mesos.util.EvaluatorControl;
import org.apache.reef.runtime.mesos.util.EvaluatorRelease;
import org.apache.reef.runtime.mesos.util.MesosRemoteManager;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;

/* loaded from: input_file:org/apache/reef/runtime/mesos/driver/REEFScheduler.class */
final class REEFScheduler implements Scheduler {
    private static final Logger LOG = Logger.getLogger(REEFScheduler.class.getName());
    private static final String REEF_TAR = "reef.tar.gz";
    private static final String RUNTIME_NAME = "MESOS";
    private static final String REEF_JOB_NAME_PREFIX = "reef-job-";
    private final String reefTarUri;
    private final REEFFileNames fileNames;
    private final ClasspathProvider classpath;
    private final REEFEventHandlers reefEventHandlers;
    private final MesosRemoteManager mesosRemoteManager;
    private final SchedulerDriver mesosMaster;
    private int mesosSlavePort;
    private final String jobSubmissionDirectoryPrefix;
    private final EStage<SchedulerDriver> schedulerDriverEStage;
    private final Map<String, Protos.Offer> offers = new ConcurrentHashMap();
    private int outstandingRequestCounter = 0;
    private final ConcurrentLinkedQueue<ResourceRequestEvent> outstandingRequests = new ConcurrentLinkedQueue<>();
    private final Map<String, ResourceRequestEvent> executorIdToLaunchedRequests = new ConcurrentHashMap();
    private final REEFExecutors executors;

    @Inject
    REEFScheduler(REEFEventHandlers rEEFEventHandlers, MesosRemoteManager mesosRemoteManager, REEFExecutors rEEFExecutors, REEFFileNames rEEFFileNames, EStage<SchedulerDriver> eStage, ClasspathProvider classpathProvider, @Parameter(JobIdentifier.class) String str, @Parameter(MesosMasterIp.class) String str2, @Parameter(MesosSlavePort.class) int i, @Parameter(JobSubmissionDirectoryPrefix.class) String str3) {
        this.mesosRemoteManager = mesosRemoteManager;
        this.reefEventHandlers = rEEFEventHandlers;
        this.executors = rEEFExecutors;
        this.fileNames = rEEFFileNames;
        this.jobSubmissionDirectoryPrefix = str3;
        this.reefTarUri = getReefTarUri(str);
        this.classpath = classpathProvider;
        this.schedulerDriverEStage = eStage;
        this.mesosMaster = new MesosSchedulerDriver(this, Protos.FrameworkInfo.newBuilder().setUser("").setName(REEF_JOB_NAME_PREFIX + str).build(), str2);
        this.mesosSlavePort = i;
    }

    @Override // org.apache.mesos.Scheduler
    public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, Protos.MasterInfo masterInfo) {
        LOG.log(Level.INFO, "Framework ID={0} registration succeeded", frameworkID);
    }

    @Override // org.apache.mesos.Scheduler
    public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) {
        LOG.log(Level.INFO, "Framework reregistered, MasterInfo: {0}", masterInfo);
    }

    @Override // org.apache.mesos.Scheduler
    public void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> list) {
        HashMap hashMap = new HashMap();
        for (Protos.Offer offer : list) {
            if (hashMap.get(offer.getSlaveId().getValue()) == null) {
                hashMap.put(offer.getSlaveId().getValue(), NodeDescriptorEventImpl.newBuilder().setIdentifier(offer.getSlaveId().getValue()).setHostName(offer.getHostname()).setPort(this.mesosSlavePort).setMemorySize(getMemory(offer)));
            } else {
                NodeDescriptorEventImpl.Builder builder = (NodeDescriptorEventImpl.Builder) hashMap.get(offer.getSlaveId().getValue());
                builder.setMemorySize(builder.build().getMemorySize() + getMemory(offer));
            }
            this.offers.put(offer.getId().getValue(), offer);
        }
        Iterator it = hashMap.values().iterator();
        while (it.hasNext()) {
            this.reefEventHandlers.onNodeDescriptor(((NodeDescriptorEventImpl.Builder) it.next()).build());
        }
        if (this.outstandingRequests.size() > 0) {
            doResourceRequest(this.outstandingRequests.remove());
        }
    }

    @Override // org.apache.mesos.Scheduler
    public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) {
        for (String str : this.executorIdToLaunchedRequests.keySet()) {
            if (str.startsWith(offerID.getValue())) {
                this.outstandingRequests.add(this.executorIdToLaunchedRequests.remove(str));
            }
        }
    }

    @Override // org.apache.mesos.Scheduler
    public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {
        LOG.log(Level.SEVERE, "Task Status Update:", taskStatus.toString());
        ResourceStatusEventImpl.Builder identifier = ResourceStatusEventImpl.newBuilder().setIdentifier(taskStatus.getTaskId().getValue());
        switch (taskStatus.getState()) {
            case TASK_STARTING:
                handleNewExecutor(taskStatus);
                return;
            case TASK_RUNNING:
                identifier.setState(State.RUNNING);
                break;
            case TASK_FINISHED:
                if (!taskStatus.getData().toStringUtf8().equals("eval_not_run")) {
                    identifier.setState(State.DONE);
                    break;
                } else {
                    return;
                }
            case TASK_KILLED:
                identifier.setState(State.KILLED);
                break;
            case TASK_LOST:
            case TASK_FAILED:
                identifier.setState(State.FAILED);
                break;
            case TASK_STAGING:
                throw new RuntimeException("TASK_STAGING should not be used for status update");
            default:
                throw new RuntimeException("Unknown TaskStatus");
        }
        if (taskStatus.getMessage() != null) {
            identifier.setDiagnostics(taskStatus.getMessage());
        }
        this.reefEventHandlers.onResourceStatus(identifier.build());
    }

    @Override // org.apache.mesos.Scheduler
    public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bArr) {
        LOG.log(Level.INFO, "Framework Message. driver: {0} executorId: {1} slaveId: {2} data: {3}", new Object[]{schedulerDriver, executorID, slaveID, bArr});
    }

    @Override // org.apache.mesos.Scheduler
    public void disconnected(SchedulerDriver schedulerDriver) {
        onRuntimeError(new RuntimeException("Scheduler disconnected from MesosMaster"));
    }

    @Override // org.apache.mesos.Scheduler
    public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) {
        LOG.log(Level.SEVERE, "Slave Lost. {0}", slaveID.getValue());
    }

    @Override // org.apache.mesos.Scheduler
    public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, int i) {
        this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder().setIdentifier(executorID.getValue()).setState(State.FAILED).setExitCode(i).setDiagnostics("Executor Lost. executorid: " + executorID.getValue() + " slaveid: " + slaveID.getValue()).build());
    }

    @Override // org.apache.mesos.Scheduler
    public void error(SchedulerDriver schedulerDriver, String str) {
        onRuntimeError(new RuntimeException(str));
    }

    public void onStart() {
        this.schedulerDriverEStage.onNext(this.mesosMaster);
    }

    public void onStop() {
        this.mesosMaster.stop();
        try {
            this.schedulerDriverEStage.close();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void onResourceRequest(ResourceRequestEvent resourceRequestEvent) {
        this.outstandingRequestCounter += resourceRequestEvent.getResourceCount();
        updateRuntimeStatus();
        doResourceRequest(resourceRequestEvent);
    }

    public void onResourceRelease(ResourceReleaseEvent resourceReleaseEvent) {
        this.executors.releaseEvaluator(new EvaluatorRelease(resourceReleaseEvent.getIdentifier()));
        this.executors.remove(resourceReleaseEvent.getIdentifier());
        updateRuntimeStatus();
    }

    private synchronized void doResourceRequest(ResourceRequestEvent resourceRequestEvent) {
        int resourceCount = resourceRequestEvent.getResourceCount();
        for (Protos.Offer offer : this.offers.values()) {
            int min = Math.min(Math.min(getCpu(offer) / resourceRequestEvent.getVirtualCores().get().intValue(), getMemory(offer) / resourceRequestEvent.getMemorySize().get().intValue()), resourceCount);
            if (min <= 0 || !satisfySlaveConstraint(resourceRequestEvent, offer)) {
                this.mesosMaster.declineOffer(offer.getId());
            } else {
                ArrayList arrayList = new ArrayList();
                resourceCount -= min;
                for (int i = 0; i < min; i++) {
                    String str = offer.getId().getValue() + HelpFormatter.DEFAULT_OPT_PREFIX + String.valueOf(i);
                    arrayList.add(Protos.TaskInfo.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(str).build()).setName(str).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(resourceRequestEvent.getMemorySize().get().intValue()).build()).build()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(resourceRequestEvent.getVirtualCores().get().intValue()).build()).build()).setExecutor(Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue(str).build()).setCommand(Protos.CommandInfo.newBuilder().setValue(getExecutorLaunchCommand(str, resourceRequestEvent.getMemorySize().get().intValue())).addUris(Protos.CommandInfo.URI.newBuilder().setValue(this.reefTarUri).build()).build()).build()).build());
                    this.executorIdToLaunchedRequests.put(str, resourceRequestEvent);
                }
                this.mesosMaster.launchTasks(Collections.singleton(offer.getId()), arrayList, Protos.Filters.newBuilder().setRefuseSeconds(CMAESOptimizer.DEFAULT_STOPFITNESS).build());
            }
        }
        this.offers.clear();
        this.outstandingRequests.add(ResourceRequestEventImpl.newBuilder().mergeFrom(resourceRequestEvent).setResourceCount(resourceCount).build());
    }

    private void handleNewExecutor(Protos.TaskStatus taskStatus) {
        ResourceRequestEvent remove = this.executorIdToLaunchedRequests.remove(taskStatus.getTaskId().getValue());
        this.executors.add(taskStatus.getTaskId().getValue(), remove.getMemorySize().get().intValue(), this.mesosRemoteManager.getHandler(taskStatus.getMessage(), EvaluatorControl.class));
        this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder().setIdentifier(taskStatus.getTaskId().getValue()).setNodeId(taskStatus.getSlaveId().getValue()).setResourceMemory(remove.getMemorySize().get().intValue()).setVirtualCores(remove.getVirtualCores().get().intValue()).setRuntimeName(RuntimeIdentifier.RUNTIME_NAME).build());
        this.outstandingRequestCounter--;
        updateRuntimeStatus();
    }

    private synchronized void updateRuntimeStatus() {
        RuntimeStatusEventImpl.Builder outstandingContainerRequests = RuntimeStatusEventImpl.newBuilder().setName(RUNTIME_NAME).setState(State.RUNNING).setOutstandingContainerRequests(this.outstandingRequestCounter);
        Iterator<String> it = this.executors.getExecutorIds().iterator();
        while (it.hasNext()) {
            outstandingContainerRequests.addContainerAllocation(it.next());
        }
        this.reefEventHandlers.onRuntimeStatus(outstandingContainerRequests.build());
    }

    private void onRuntimeError(Throwable th) {
        this.mesosMaster.stop();
        try {
            this.schedulerDriverEStage.close();
            RuntimeStatusEventImpl.Builder name = RuntimeStatusEventImpl.newBuilder().setState(State.FAILED).setName(RUNTIME_NAME);
            name.setError(ReefServiceProtos.RuntimeErrorProto.newBuilder().setName(RUNTIME_NAME).setMessage(th.getMessage()).setException(ByteString.copyFrom(new ObjectSerializableCodec().encode(th))).build());
            this.reefEventHandlers.onRuntimeStatus(name.build());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean satisfySlaveConstraint(ResourceRequestEvent resourceRequestEvent, Protos.Offer offer) {
        return resourceRequestEvent.getNodeNameList().size() == 0 || resourceRequestEvent.getNodeNameList().contains(offer.getSlaveId().getValue());
    }

    private int getMemory(Protos.Offer offer) {
        for (Protos.Resource resource : offer.getResourcesList()) {
            if (resource.getName().equals("mem")) {
                return (int) resource.getScalar().getValue();
            }
        }
        return 0;
    }

    private int getCpu(Protos.Offer offer) {
        for (Protos.Resource resource : offer.getResourcesList()) {
            if (resource.getName().equals("cpus")) {
                return (int) resource.getScalar().getValue();
            }
        }
        return 0;
    }

    private String getExecutorLaunchCommand(String str, int i) {
        return ((System.getenv("JAVA_HOME") + "/bin/java") + " ") + "-XX:PermSize=128m -XX:MaxPermSize=128m " + ("-Xmx" + String.valueOf(i) + "m ") + (("-classpath " + StringUtils.join(this.classpath.getEvaluatorClasspath(), ":")) + " ") + "-Djava.util.logging.config.class=org.apache.reef.util.logging.Config " + (org.apache.reef.runtime.mesos.evaluator.REEFExecutor.class.getName() + " ") + (("-mesos_executor_id " + str) + " ");
    }

    private String getReefTarUri(String str) {
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(REEF_TAR);
            TarArchiveOutputStream tarArchiveOutputStream = new TarArchiveOutputStream(new GZIPOutputStream(fileOutputStream));
            File file = new File(this.fileNames.getGlobalFolderPath());
            DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(file.toPath());
            for (Path path : newDirectoryStream) {
                tarArchiveOutputStream.putArchiveEntry(new TarArchiveEntry(path.toFile(), file + "/" + path.getFileName()));
                BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(path.toFile()));
                IOUtils.copy(bufferedInputStream, tarArchiveOutputStream);
                bufferedInputStream.close();
                tarArchiveOutputStream.closeArchiveEntry();
            }
            newDirectoryStream.close();
            tarArchiveOutputStream.close();
            fileOutputStream.close();
            FileSystem fileSystem = FileSystem.get(new Configuration());
            org.apache.hadoop.fs.Path path2 = new org.apache.hadoop.fs.Path(REEF_TAR);
            String str2 = fileSystem.getUri().toString() + this.jobSubmissionDirectoryPrefix + "/" + str + "/" + REEF_TAR;
            fileSystem.copyFromLocalFile(path2, new org.apache.hadoop.fs.Path(str2));
            return str2;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
