package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.Props;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.messages.ContainersAllocated;
import org.apache.flink.yarn.messages.ContainersComplete;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import scala.Option;

/* loaded from: input_file:org/apache/flink/yarn/YarnFlinkResourceManager.class */
public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYarnWorkerNode> {
    private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
    private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
    static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
    private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(0);
    private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
    private final Map<ContainerId, Container> containersBeingReturned;
    private final YarnConfiguration yarnConfig;
    private final ContaineredTaskManagerParameters taskManagerParameters;
    private final ContainerLaunchContext taskManagerLaunchContext;
    private final String applicationMasterHostName;
    private final String webInterfaceURL;
    private final int yarnHeartbeatIntervalMillis;
    private final int maxFailedContainers;
    private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
    private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
    private NMClient nodeManagerClient;
    private int numPendingContainerRequests;
    private int failedContainersSoFar;
    private RegisterApplicationMasterResponseReflector applicationMasterResponseReflector;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.yarn.YarnFlinkResourceManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/yarn/YarnFlinkResourceManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus = new int[ApplicationStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.SUCCEEDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.CANCELED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public YarnFlinkResourceManager(Configuration configuration, YarnConfiguration yarnConfiguration, LeaderRetrievalService leaderRetrievalService, String str, String str2, ContaineredTaskManagerParameters containeredTaskManagerParameters, ContainerLaunchContext containerLaunchContext, int i, int i2, int i3) {
        this(configuration, yarnConfiguration, leaderRetrievalService, str, str2, containeredTaskManagerParameters, containerLaunchContext, i, i2, i3, new YarnResourceManagerCallbackHandler());
    }

    public YarnFlinkResourceManager(Configuration configuration, YarnConfiguration yarnConfiguration, LeaderRetrievalService leaderRetrievalService, String str, String str2, ContaineredTaskManagerParameters containeredTaskManagerParameters, ContainerLaunchContext containerLaunchContext, int i, int i2, int i3, YarnResourceManagerCallbackHandler yarnResourceManagerCallbackHandler) {
        this(configuration, yarnConfiguration, leaderRetrievalService, str, str2, containeredTaskManagerParameters, containerLaunchContext, i, i2, i3, yarnResourceManagerCallbackHandler, AMRMClientAsync.createAMRMClientAsync(i, yarnResourceManagerCallbackHandler), NMClient.createNMClient());
    }

    public YarnFlinkResourceManager(Configuration configuration, YarnConfiguration yarnConfiguration, LeaderRetrievalService leaderRetrievalService, String str, String str2, ContaineredTaskManagerParameters containeredTaskManagerParameters, ContainerLaunchContext containerLaunchContext, int i, int i2, int i3, YarnResourceManagerCallbackHandler yarnResourceManagerCallbackHandler, AMRMClientAsync<AMRMClient.ContainerRequest> aMRMClientAsync, NMClient nMClient) {
        super(i3, configuration, leaderRetrievalService);
        this.applicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(this.LOG);
        this.yarnConfig = (YarnConfiguration) Objects.requireNonNull(yarnConfiguration);
        this.taskManagerParameters = (ContaineredTaskManagerParameters) Objects.requireNonNull(containeredTaskManagerParameters);
        this.taskManagerLaunchContext = (ContainerLaunchContext) Objects.requireNonNull(containerLaunchContext);
        this.applicationMasterHostName = (String) Objects.requireNonNull(str);
        this.webInterfaceURL = str2;
        this.yarnHeartbeatIntervalMillis = i;
        this.maxFailedContainers = i2;
        this.resourceManagerCallbackHandler = (YarnResourceManagerCallbackHandler) Preconditions.checkNotNull(yarnResourceManagerCallbackHandler);
        this.resourceManagerClient = (AMRMClientAsync) Preconditions.checkNotNull(aMRMClientAsync);
        this.nodeManagerClient = (NMClient) Preconditions.checkNotNull(nMClient);
        this.containersInLaunch = new HashMap();
        this.containersBeingReturned = new HashMap();
    }

    protected void handleMessage(Object obj) {
        if (obj instanceof ContainersAllocated) {
            containersAllocated(((ContainersAllocated) obj).containers());
        } else if (obj instanceof ContainersComplete) {
            containersComplete(((ContainersComplete) obj).containers());
        } else {
            super.handleMessage(obj);
        }
    }

    protected void initialize() throws Exception {
        this.LOG.info("Initializing YARN resource master");
        this.resourceManagerCallbackHandler.initialize(self());
        this.resourceManagerClient.init(this.yarnConfig);
        this.resourceManagerClient.start();
        this.nodeManagerClient.init(this.yarnConfig);
        this.nodeManagerClient.start();
        this.nodeManagerClient.cleanupRunningContainersOnStop(true);
        this.LOG.info("Registering Application Master with tracking url {}", this.webInterfaceURL);
        Option port = AkkaUtils.getAddress(getContext().system()).port();
        List<Container> containersFromPreviousAttempts = this.applicationMasterResponseReflector.getContainersFromPreviousAttempts(this.resourceManagerClient.registerApplicationMaster(this.applicationMasterHostName, port.isDefined() ? ((Integer) port.get()).intValue() : -1, this.webInterfaceURL));
        if (containersFromPreviousAttempts.isEmpty()) {
            return;
        }
        this.LOG.info("Retrieved {} TaskManagers from previous attempt", Integer.valueOf(containersFromPreviousAttempts.size()));
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Container> it = containersFromPreviousAttempts.iterator();
        while (it.hasNext()) {
            YarnContainerInLaunch yarnContainerInLaunch = new YarnContainerInLaunch(it.next(), currentTimeMillis);
            this.containersInLaunch.put(yarnContainerInLaunch.getResourceID(), yarnContainerInLaunch);
        }
        updateProgress();
    }

    protected void shutdownApplication(ApplicationStatus applicationStatus, String str) {
        FinalApplicationStatus yarnStatus = getYarnStatus(applicationStatus);
        this.LOG.info("Unregistering application from the YARN Resource Manager");
        try {
            this.resourceManagerClient.unregisterApplicationMaster(yarnStatus, str, "");
        } catch (Throwable th) {
            this.LOG.error("Could not unregister the application master.", th);
        }
        try {
            this.resourceManagerClient.stop();
        } catch (Throwable th2) {
            this.LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", th2);
        }
        try {
            this.nodeManagerClient.stop();
        } catch (Throwable th3) {
            this.LOG.error("Could not cleanly shut down the Node Manager Client", th3);
        }
        getContext().system().stop(getSelf());
    }

    protected void fatalError(String str, Throwable th) {
        this.LOG.error("FATAL ERROR IN YARN APPLICATION MASTER: " + str, th);
        this.LOG.error("Shutting down process");
        System.exit(-13);
    }

    protected void requestNewWorkers(int i) {
        Resource containerResource = getContainerResource();
        for (int i2 = 0; i2 < i; i2++) {
            this.numPendingContainerRequests++;
            this.LOG.info("Requesting new TaskManager container with {} megabytes memory. Pending requests: {}", Integer.valueOf(containerResource.getMemory()), Integer.valueOf(this.numPendingContainerRequests));
            this.resourceManagerClient.addContainerRequest(createContainerRequest(containerResource));
        }
        this.resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
    }

    private Resource getContainerResource() {
        int i;
        long taskManagerTotalMemoryMB = this.taskManagerParameters.taskManagerTotalMemoryMB();
        if (taskManagerTotalMemoryMB <= 2147483647L) {
            i = (int) taskManagerTotalMemoryMB;
        } else {
            i = Integer.MAX_VALUE;
            this.LOG.error("Decreasing container size from {} MB to {} MB (integer value overflow)", Long.valueOf(taskManagerTotalMemoryMB), Integer.MAX_VALUE);
        }
        return Resource.newInstance(i, this.config.getInteger(YarnConfigOptions.VCORES, Math.max(this.taskManagerParameters.numSlots(), 1)));
    }

    @Nonnull
    private AMRMClient.ContainerRequest createContainerRequest(Resource resource) {
        return new AMRMClient.ContainerRequest(resource, (String[]) null, (String[]) null, RM_REQUEST_PRIORITY);
    }

    protected void releasePendingWorker(ResourceID resourceID) {
        YarnContainerInLaunch remove = this.containersInLaunch.remove(resourceID);
        if (remove != null) {
            releaseYarnContainer(remove.container());
        } else {
            this.LOG.error("Cannot find container {} to release. Ignoring request.", resourceID);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void releaseStartedWorker(RegisteredYarnWorkerNode registeredYarnWorkerNode) {
        releaseYarnContainer(registeredYarnWorkerNode.yarnContainer());
    }

    private void releaseYarnContainer(Container container) {
        this.LOG.info("Releasing YARN container {}", container.getId());
        this.containersBeingReturned.put(container.getId(), container);
        try {
            this.nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
        } catch (Throwable th) {
            this.LOG.error("Error while calling YARN Node Manager to release container", th);
        }
        this.resourceManagerClient.releaseAssignedContainer(container.getId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: workerStarted, reason: merged with bridge method [inline-methods] */
    public RegisteredYarnWorkerNode m10workerStarted(ResourceID resourceID) {
        YarnContainerInLaunch remove = this.containersInLaunch.remove(resourceID);
        if (remove == null) {
            return null;
        }
        return new RegisteredYarnWorkerNode(remove.container());
    }

    protected Collection<RegisteredYarnWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> collection) {
        ArrayList arrayList = new ArrayList();
        for (ResourceID resourceID : collection) {
            YarnContainerInLaunch remove = this.containersInLaunch.remove(resourceID);
            if (remove != null) {
                this.LOG.info("YARN container consolidation recognizes Resource {} ", resourceID);
                arrayList.add(new RegisteredYarnWorkerNode(remove.container()));
            } else if (isStarted(resourceID)) {
                this.LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
            } else {
                this.LOG.info("YARN container consolidation does not recognize TaskManager {}", resourceID);
            }
        }
        return arrayList;
    }

    protected int getNumWorkerRequestsPending() {
        return this.numPendingContainerRequests;
    }

    protected int getNumWorkersPendingRegistration() {
        return this.containersInLaunch.size();
    }

    private void containersAllocated(List<Container> list) {
        int designatedWorkerPoolSize = getDesignatedWorkerPoolSize();
        int numberOfStartedTaskManagers = getNumberOfStartedTaskManagers();
        Iterator<AMRMClient.ContainerRequest> it = getPendingRequests().iterator();
        for (Container container : list) {
            if (this.numPendingContainerRequests > 0) {
                this.numPendingContainerRequests--;
                this.resourceManagerClient.removeContainerRequest(it.next());
            }
            this.numPendingContainerRequests = Math.max(0, this.numPendingContainerRequests - 1);
            this.LOG.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), Integer.valueOf(this.numPendingContainerRequests));
            if (numberOfStartedTaskManagers + this.containersInLaunch.size() < designatedWorkerPoolSize) {
                YarnContainerInLaunch yarnContainerInLaunch = new YarnContainerInLaunch(container);
                ResourceID resourceID = yarnContainerInLaunch.getResourceID();
                this.containersInLaunch.put(resourceID, yarnContainerInLaunch);
                String str = "Launching TaskManager in container " + yarnContainerInLaunch + " on host " + container.getNodeId().getHost();
                this.LOG.info(str);
                sendInfoMessage(str);
                try {
                    this.taskManagerLaunchContext.getEnvironment().put(ENV_FLINK_CONTAINER_ID, resourceID.getResourceIdString());
                    this.nodeManagerClient.startContainer(container, this.taskManagerLaunchContext);
                } catch (Throwable th) {
                    this.containersInLaunch.remove(resourceID);
                    this.LOG.error("Could not start TaskManager in container " + yarnContainerInLaunch, th);
                    this.containersBeingReturned.put(container.getId(), container);
                    this.resourceManagerClient.releaseAssignedContainer(container.getId());
                }
            } else {
                this.LOG.info("Returning excess container {}", container.getId());
                this.containersBeingReturned.put(container.getId(), container);
                this.resourceManagerClient.releaseAssignedContainer(container.getId());
            }
        }
        updateProgress();
        if (this.numPendingContainerRequests <= 0) {
            this.resourceManagerClient.setHeartbeatInterval(this.yarnHeartbeatIntervalMillis);
        }
        triggerCheckWorkers();
    }

    private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
        List matchingRequests = this.resourceManagerClient.getMatchingRequests(RM_REQUEST_PRIORITY, "*", getContainerResource());
        Collection emptyList = matchingRequests.isEmpty() ? Collections.emptyList() : new ArrayList((Collection) matchingRequests.get(0));
        Preconditions.checkState(emptyList.size() == this.numPendingContainerRequests, "The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", new Object[]{Integer.valueOf(emptyList.size()), Integer.valueOf(this.numPendingContainerRequests)});
        return emptyList;
    }

    private void containersComplete(List<ContainerStatus> list) {
        String valueOf;
        for (ContainerStatus containerStatus : list) {
            ResourceID resourceID = new ResourceID(containerStatus.getContainerId().toString());
            if (this.containersBeingReturned.remove(containerStatus.getContainerId()) != null) {
                this.LOG.info("Container {} completed successfully with diagnostics: {}", resourceID, containerStatus.getDiagnostics());
            } else {
                switch (containerStatus.getExitStatus()) {
                    case -104:
                        valueOf = "Pmem limit exceeded (-104)";
                        break;
                    case -103:
                        valueOf = "Vmem limit exceeded (-103)";
                        break;
                    default:
                        valueOf = String.valueOf(containerStatus.getExitStatus());
                        break;
                }
                if (this.containersInLaunch.remove(resourceID) != null) {
                    this.LOG.info("Container {} failed, with a TaskManager in launch or registration. Exit status: {}", resourceID, valueOf);
                } else {
                    this.LOG.info("Container {} failed. Exit status: {}", resourceID, valueOf);
                    notifyWorkerFailed(resourceID, "Container " + resourceID + " failed. Exit status: {}" + valueOf);
                }
                this.failedContainersSoFar++;
                String format = String.format("Diagnostics for container %s in state %s : exitStatus=%s diagnostics=%s", resourceID, containerStatus.getState(), valueOf, containerStatus.getDiagnostics());
                sendInfoMessage(format);
                this.LOG.info(format);
                this.LOG.info("Total number of failed containers so far: " + this.failedContainersSoFar);
                if (this.maxFailedContainers >= 0 && this.failedContainersSoFar > this.maxFailedContainers) {
                    String str = "Stopping YARN session because the number of failed containers (" + this.failedContainersSoFar + ") exceeded the maximum failed containers (" + this.maxFailedContainers + "). This number is controlled by the '" + YarnConfigOptions.MAX_FAILED_CONTAINERS.key() + "' configuration setting. By default its the number of requested containers.";
                    this.LOG.error(str);
                    self().tell(decorateMessage(new StopCluster(ApplicationStatus.FAILED, str)), ActorRef.noSender());
                    return;
                }
            }
        }
        updateProgress();
        triggerCheckWorkers();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ResourceID extractResourceID(Container container) {
        return new ResourceID(container.getId().toString());
    }

    private void updateProgress() {
        int designatedWorkerPoolSize = getDesignatedWorkerPoolSize();
        float numberOfStartedTaskManagers = designatedWorkerPoolSize <= 0 ? 1.0f : (getNumberOfStartedTaskManagers() + this.containersInLaunch.size()) / designatedWorkerPoolSize;
        if (this.resourceManagerCallbackHandler != null) {
            this.resourceManagerCallbackHandler.setCurrentProgress(numberOfStartedTaskManagers);
        }
    }

    private FinalApplicationStatus getYarnStatus(ApplicationStatus applicationStatus) {
        if (applicationStatus == null) {
            return FinalApplicationStatus.UNDEFINED;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[applicationStatus.ordinal()]) {
            case 1:
                return FinalApplicationStatus.SUCCEEDED;
            case 2:
                return FinalApplicationStatus.FAILED;
            case 3:
                return FinalApplicationStatus.KILLED;
            default:
                return FinalApplicationStatus.UNDEFINED;
        }
    }

    public static Props createActorProps(Class<? extends YarnFlinkResourceManager> cls, Configuration configuration, YarnConfiguration yarnConfiguration, LeaderRetrievalService leaderRetrievalService, String str, String str2, ContaineredTaskManagerParameters containeredTaskManagerParameters, ContainerLaunchContext containerLaunchContext, int i, Logger logger) {
        int integer = configuration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
        long j = yarnConfiguration.getLong("yarn.am.liveness-monitor.expiry-interval-ms", 600000L);
        if (integer >= j) {
            logger.warn("The heartbeat interval of the Flink Application master ({}) is greater than YARN's expiry interval ({}). The application is likely to be killed by YARN.", Integer.valueOf(integer), Long.valueOf(j));
        }
        int integer2 = configuration.getInteger(YarnConfigOptions.MAX_FAILED_CONTAINERS.key(), i);
        if (integer2 >= 0) {
            logger.info("YARN application tolerates {} failed TaskManager containers before giving up", Integer.valueOf(integer2));
        }
        return Props.create(cls, new Object[]{configuration, yarnConfiguration, leaderRetrievalService, str, str2, containeredTaskManagerParameters, containerLaunchContext, Integer.valueOf(integer), Integer.valueOf(integer2), Integer.valueOf(i)});
    }
}
