package org.apache.reef.runtime.yarn.driver;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.commons.collections.ListUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.reef.driver.ProgressProvider;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.DriverStatusManager;
import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;

/* loaded from: input_file:org/apache/reef/runtime/yarn/driver/YarnContainerManager.class */
final class YarnContainerManager implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
    private static final Logger LOG = Logger.getLogger(YarnContainerManager.class.getName());
    private static final String RUNTIME_NAME = "YARN";
    private final YarnClient yarnClient = YarnClient.createYarnClient();
    private final Queue<AMRMClient.ContainerRequest> requestsBeforeSentToRM = new ConcurrentLinkedQueue();
    private final Queue<AMRMClient.ContainerRequest> requestsAfterSentToRM = new ConcurrentLinkedQueue();
    private final Map<String, String> nodeIdToRackName = new ConcurrentHashMap();
    private final YarnConfiguration yarnConf;
    private final AMRMClientAsync resourceManager;
    private final NMClientAsync nodeManager;
    private final REEFEventHandlers reefEventHandlers;
    private final Containers containers;
    private final ApplicationMasterRegistration registration;
    private final ContainerRequestCounter containerRequestCounter;
    private final DriverStatusManager driverStatusManager;
    private final TrackingURLProvider trackingURLProvider;
    private final String jobSubmissionDirectory;
    private final REEFFileNames reefFileNames;
    private final RackNameFormatter rackNameFormatter;
    private final InjectionFuture<ProgressProvider> progressProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.reef.runtime.yarn.driver.YarnContainerManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/reef/runtime/yarn/driver/YarnContainerManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$yarn$api$records$ContainerState = new int[ContainerState.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$ContainerState[ContainerState.COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    @Inject
    YarnContainerManager(YarnConfiguration yarnConfiguration, @Parameter(YarnHeartbeatPeriod.class) int i, REEFEventHandlers rEEFEventHandlers, Containers containers, ApplicationMasterRegistration applicationMasterRegistration, ContainerRequestCounter containerRequestCounter, DriverStatusManager driverStatusManager, REEFFileNames rEEFFileNames, @Parameter(JobSubmissionDirectory.class) String str, TrackingURLProvider trackingURLProvider, RackNameFormatter rackNameFormatter, InjectionFuture<ProgressProvider> injectionFuture) throws IOException {
        this.reefEventHandlers = rEEFEventHandlers;
        this.driverStatusManager = driverStatusManager;
        this.containers = containers;
        this.registration = applicationMasterRegistration;
        this.containerRequestCounter = containerRequestCounter;
        this.yarnConf = yarnConfiguration;
        this.trackingURLProvider = trackingURLProvider;
        this.rackNameFormatter = rackNameFormatter;
        this.yarnClient.init(this.yarnConf);
        this.resourceManager = AMRMClientAsync.createAMRMClientAsync(i, this);
        this.nodeManager = new NMClientAsyncImpl(this);
        this.jobSubmissionDirectory = str;
        this.reefFileNames = rEEFFileNames;
        this.progressProvider = injectionFuture;
        LOG.log(Level.FINEST, "Instantiated YarnContainerManager");
    }

    public void onContainersCompleted(List<ContainerStatus> list) {
        Iterator<ContainerStatus> it = list.iterator();
        while (it.hasNext()) {
            onContainerStatus(it.next());
        }
    }

    public void onContainersAllocated(List<Container> list) {
        String format = String.format("%s:%d", Thread.currentThread().getName().replace(' ', '_'), Long.valueOf(System.currentTimeMillis()));
        LOG.log(Level.FINE, "TIME: Allocated Containers {0} {1} of {2}", new Object[]{format, Integer.valueOf(list.size()), Integer.valueOf(this.containerRequestCounter.get())});
        Iterator<Container> it = list.iterator();
        while (it.hasNext()) {
            handleNewContainer(it.next());
        }
        LOG.log(Level.FINE, "TIME: Processed Containers {0}", format);
    }

    public void onShutdownRequest() {
        this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder().setName(RUNTIME_NAME).setState(State.DONE).build());
        this.driverStatusManager.onError(new Exception("Shutdown requested by YARN."));
    }

    public void onNodesUpdated(List<NodeReport> list) {
        for (NodeReport nodeReport : list) {
            this.nodeIdToRackName.put(nodeReport.getNodeId().toString(), nodeReport.getRackName());
            onNodeReport(nodeReport);
        }
    }

    public float getProgress() {
        try {
            return Math.max(Math.min(1.0f, ((ProgressProvider) this.progressProvider.get()).getProgress()), 0.0f);
        } catch (Exception e) {
            LOG.log(Level.WARNING, "An exception occurred in ProgressProvider.getProgress(), with message : " + e.getMessage() + ". Returning 0 as progress.");
            return 0.0f;
        }
    }

    public void onError(Throwable th) {
        onRuntimeError(th);
    }

    public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) {
        Optional<Container> optional = this.containers.getOptional(containerId.toString());
        if (optional.isPresent()) {
            this.nodeManager.getContainerStatusAsync(containerId, ((Container) optional.get()).getNodeId());
        }
    }

    public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
        onContainerStatus(containerStatus);
    }

    public void onContainerStopped(ContainerId containerId) {
        if (this.containers.hasContainer(containerId.toString())) {
            ResourceStatusEventImpl.Builder identifier = ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
            identifier.setState(State.DONE);
            this.reefEventHandlers.onResourceStatus(identifier.build());
        }
    }

    public void onStartContainerError(ContainerId containerId, Throwable th) {
        handleContainerError(containerId, th);
    }

    public void onGetContainerStatusError(ContainerId containerId, Throwable th) {
        handleContainerError(containerId, th);
    }

    public void onStopContainerError(ContainerId containerId, Throwable th) {
        handleContainerError(containerId, th);
    }

    public void onContainersRecovered(Set<Container> set) {
        Iterator<Container> it = set.iterator();
        while (it.hasNext()) {
            this.containers.add(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void submit(Container container, ContainerLaunchContext containerLaunchContext) {
        this.nodeManager.startContainerAsync(container, containerLaunchContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void release(String str) {
        LOG.log(Level.FINE, "Release container: {0}", str);
        this.resourceManager.releaseAssignedContainer(this.containers.removeAndGet(str).getId());
        updateRuntimeStatus();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onStart() {
        this.yarnClient.start();
        this.resourceManager.init(this.yarnConf);
        this.resourceManager.start();
        this.nodeManager.init(this.yarnConf);
        this.nodeManager.start();
        try {
            Iterator it = this.yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING}).iterator();
            while (it.hasNext()) {
                onNodeReport((NodeReport) it.next());
            }
        } catch (IOException | YarnException e) {
            LOG.log(Level.WARNING, "Unable to fetch node reports from YARN.", (Throwable) e);
            onRuntimeError(e);
        }
        try {
            this.registration.setRegistration(this.resourceManager.registerApplicationMaster("", 0, this.trackingURLProvider.getTrackingUrl()));
            LOG.log(Level.FINE, "YARN registration: {0}", this.registration);
            FSDataOutputStream create = FileSystem.get(this.yarnConf).create(new Path(this.jobSubmissionDirectory, this.reefFileNames.getDriverHttpEndpoint()));
            create.writeBytes(this.trackingURLProvider.getTrackingUrl() + "\n");
            create.flush();
            create.close();
        } catch (YarnException | IOException e2) {
            LOG.log(Level.WARNING, "Unable to register application master.", e2);
            onRuntimeError(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onStop(Throwable th) {
        LOG.log(Level.FINE, "Stop Runtime: RM status {0}", this.resourceManager.getServiceState());
        if (this.resourceManager.getServiceState() == Service.STATE.STARTED) {
            try {
                this.reefEventHandlers.close();
                if (th == null) {
                    this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, (String) null, (String) null);
                } else {
                    this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.FAILED, String.format("Application failed due to:%n%s%nWith stack trace:%n%s", th.getMessage(), ExceptionUtils.getStackTrace(th)), (String) null);
                }
                this.resourceManager.close();
            } catch (Exception e) {
                LOG.log(Level.WARNING, "Error shutting down YARN application", (Throwable) e);
            }
        }
        if (this.nodeManager.getServiceState() == Service.STATE.STARTED) {
            try {
                this.nodeManager.close();
            } catch (IOException e2) {
                LOG.log(Level.WARNING, "Error closing YARN Node Manager", (Throwable) e2);
            }
        }
    }

    private void onNodeReport(NodeReport nodeReport) {
        LOG.log(Level.FINE, "Send node descriptor: {0}", nodeReport);
        this.reefEventHandlers.onNodeDescriptor(NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeReport.getNodeId().toString()).setHostName(nodeReport.getNodeId().getHost()).setPort(nodeReport.getNodeId().getPort()).setMemorySize(nodeReport.getCapability().getMemory()).setRackName(nodeReport.getRackName()).build());
    }

    private void handleContainerError(ContainerId containerId, Throwable th) {
        ResourceStatusEventImpl.Builder identifier = ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString());
        identifier.setState(State.FAILED);
        identifier.setExitCode(1);
        identifier.setDiagnostics(th.getMessage());
        this.reefEventHandlers.onResourceStatus(identifier.build());
    }

    private void onContainerStatus(ContainerStatus containerStatus) {
        String containerId = containerStatus.getContainerId().toString();
        if (this.containers.hasContainer(containerId)) {
            LOG.log(Level.FINE, "Received container status: {0}", containerId);
            ResourceStatusEventImpl.Builder identifier = ResourceStatusEventImpl.newBuilder().setIdentifier(containerId);
            switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$yarn$api$records$ContainerState[containerStatus.getState().ordinal()]) {
                case 1:
                    LOG.log(Level.FINE, "Container completed: status {0}", Integer.valueOf(containerStatus.getExitStatus()));
                    switch (containerStatus.getExitStatus()) {
                        case 0:
                            identifier.setState(State.DONE);
                            break;
                        case 143:
                            identifier.setState(State.KILLED);
                            break;
                        default:
                            identifier.setState(State.FAILED);
                            break;
                    }
                    identifier.setExitCode(containerStatus.getExitStatus());
                    break;
                default:
                    LOG.info("Container running");
                    identifier.setState(State.RUNNING);
                    break;
            }
            if (containerStatus.getDiagnostics() != null) {
                LOG.log(Level.FINE, "Container diagnostics: {0}", containerStatus.getDiagnostics());
                identifier.setDiagnostics(containerStatus.getDiagnostics());
            }
            this.reefEventHandlers.onResourceStatus(identifier.build());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onContainerRequest(AMRMClient.ContainerRequest... containerRequestArr) {
        synchronized (this) {
            this.containerRequestCounter.incrementBy(containerRequestArr.length);
            this.requestsBeforeSentToRM.addAll(Arrays.asList(containerRequestArr));
            doHomogeneousRequests();
        }
        updateRuntimeStatus();
    }

    private void handleNewContainer(Container container) {
        LOG.log(Level.FINE, "allocated container: id[ {0} ]", container.getId());
        synchronized (this) {
            if (matchContainerWithPendingRequest(container)) {
                AMRMClient.ContainerRequest peek = this.requestsAfterSentToRM.peek();
                this.containerRequestCounter.decrement();
                this.containers.add(container);
                LOG.log(Level.FINEST, "{0} matched with {1}", new Object[]{container.toString(), peek.toString()});
                if (this.requestsAfterSentToRM.size() > 1) {
                    try {
                        this.resourceManager.removeContainerRequest(peek);
                    } catch (Exception e) {
                        LOG.log(Level.WARNING, "Nothing to remove from Async AMRM client's queue, removal attempt failed with exception", (Throwable) e);
                    }
                }
                this.requestsAfterSentToRM.remove();
                doHomogeneousRequests();
                LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = {1}", new Object[]{Integer.valueOf(container.getResource().getMemory()), Integer.valueOf(container.getResource().getVirtualCores())});
                this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder().setIdentifier(container.getId().toString()).setNodeId(container.getNodeId().toString()).setResourceMemory(container.getResource().getMemory()).setVirtualCores(container.getResource().getVirtualCores()).setRackName(this.rackNameFormatter.getRackName(container)).setRuntimeName(RuntimeIdentifier.RUNTIME_NAME).build());
                updateRuntimeStatus();
            } else {
                LOG.log(Level.WARNING, "Got an extra container {0} that doesn't match, releasing...", container.getId());
                this.resourceManager.releaseAssignedContainer(container.getId());
            }
        }
    }

    private synchronized void doHomogeneousRequests() {
        if (this.requestsAfterSentToRM.isEmpty()) {
            AMRMClient.ContainerRequest peek = this.requestsBeforeSentToRM.peek();
            while (!this.requestsBeforeSentToRM.isEmpty() && isSameKindOfRequest(peek, this.requestsBeforeSentToRM.peek())) {
                AMRMClient.ContainerRequest remove = this.requestsBeforeSentToRM.remove();
                this.resourceManager.addContainerRequest(remove);
                this.requestsAfterSentToRM.add(remove);
            }
        }
    }

    private boolean isSameKindOfRequest(AMRMClient.ContainerRequest containerRequest, AMRMClient.ContainerRequest containerRequest2) {
        return containerRequest.getPriority().compareTo(containerRequest2.getPriority()) == 0 && containerRequest.getCapability().compareTo(containerRequest2.getCapability()) == 0 && containerRequest.getRelaxLocality() == containerRequest2.getRelaxLocality() && ListUtils.isEqualList(containerRequest.getNodes(), containerRequest2.getNodes()) && ListUtils.isEqualList(containerRequest.getRacks(), containerRequest2.getRacks());
    }

    private boolean matchContainerWithPendingRequest(Container container) {
        if (this.requestsAfterSentToRM.isEmpty()) {
            return false;
        }
        AMRMClient.ContainerRequest peek = this.requestsAfterSentToRM.peek();
        return (container.getResource().getMemory() >= peek.getCapability().getMemory()) && (peek.getRelaxLocality() || ((peek.getRacks() == null || peek.getRacks().contains(this.nodeIdToRackName.get(container.getNodeId().toString()))) && (peek.getNodes() == null || peek.getNodes().contains(container.getNodeId().getHost()))));
    }

    private void updateRuntimeStatus() {
        RuntimeStatusEventImpl.Builder outstandingContainerRequests = RuntimeStatusEventImpl.newBuilder().setName(RUNTIME_NAME).setState(State.RUNNING).setOutstandingContainerRequests(this.containerRequestCounter.get());
        Iterator<String> it = this.containers.getContainerIds().iterator();
        while (it.hasNext()) {
            outstandingContainerRequests.addContainerAllocation(it.next());
        }
        this.reefEventHandlers.onRuntimeStatus(outstandingContainerRequests.build());
    }

    private void onRuntimeError(Throwable th) {
        try {
            try {
                this.reefEventHandlers.close();
                this.resourceManager.unregisterApplicationMaster(FinalApplicationStatus.FAILED, th.getMessage(), (String) null);
                this.resourceManager.stop();
            } catch (Exception e) {
                LOG.log(Level.WARNING, "Error shutting down YARN application", (Throwable) e);
                this.resourceManager.stop();
            }
            RuntimeStatusEventImpl.Builder name = RuntimeStatusEventImpl.newBuilder().setState(State.FAILED).setName(RUNTIME_NAME);
            name.setError(ReefServiceProtos.RuntimeErrorProto.newBuilder().setName(RUNTIME_NAME).setMessage(th.getMessage()).setException(ByteString.copyFrom(new ObjectSerializableCodec().encode(th))).build()).build();
            this.reefEventHandlers.onRuntimeStatus(name.build());
        } catch (Throwable th2) {
            this.resourceManager.stop();
            throw th2;
        }
    }
}
