package org.apache.flink.yarn;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.WorkerSpecContainerResourceAdapter;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

/* loaded from: input_file:org/apache/flink/yarn/YarnResourceManager.class */
public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
    private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(1);
    private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
    static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
    static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
    static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager.";
    private final int yarnHeartbeatIntervalMillis;
    private final YarnConfiguration yarnConfig;

    @Nullable
    private final String webInterfaceUrl;
    private final int containerRequestHeartbeatIntervalMillis;
    private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
    private NMClientAsync nodeManagerClient;
    private final WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter;
    private final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector;
    private WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.yarn.YarnResourceManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/yarn/YarnResourceManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus = new int[ApplicationStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.SUCCEEDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.CANCELED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public YarnResourceManager(RpcService rpcService, ResourceID resourceID, Configuration configuration, Map<String, String> map, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory resourceManagerPartitionTrackerFactory, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, @Nullable String str, ResourceManagerMetricGroup resourceManagerMetricGroup) {
        super(configuration, map, rpcService, resourceID, highAvailabilityServices, heartbeatServices, slotManager, resourceManagerPartitionTrackerFactory, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup);
        this.yarnConfig = new YarnConfiguration();
        this.workerNodeMap = new ConcurrentHashMap();
        int integer = configuration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
        long j = this.yarnConfig.getLong("yarn.am.liveness-monitor.expiry-interval-ms", 600000L);
        if (integer >= j) {
            this.log.warn("The heartbeat interval of the Flink Application master ({}) is greater than YARN's expiry interval ({}). The application is likely to be killed by YARN.", Integer.valueOf(integer), Long.valueOf(j));
        }
        this.yarnHeartbeatIntervalMillis = integer;
        this.containerRequestHeartbeatIntervalMillis = configuration.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
        this.webInterfaceUrl = str;
        this.workerSpecContainerResourceAdapter = Utils.createWorkerSpecContainerResourceAdapter(configuration, this.yarnConfig);
        this.registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(this.log);
        this.matchingStrategy = configuration.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ? WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE : WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
    }

    protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(YarnConfiguration yarnConfiguration, int i, @Nullable String str) throws Exception {
        int i2;
        AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClientAsync = AMRMClientAsync.createAMRMClientAsync(i, this);
        createAMRMClientAsync.init(yarnConfiguration);
        createAMRMClientAsync.start();
        Tuple2<String, Integer> parseHostPort = parseHostPort(getAddress());
        if (str != null) {
            int lastIndexOf = str.lastIndexOf(58);
            i2 = lastIndexOf == -1 ? -1 : Integer.valueOf(str.substring(lastIndexOf + 1)).intValue();
        } else {
            i2 = -1;
        }
        RegisterApplicationMasterResponse registerApplicationMaster = createAMRMClientAsync.registerApplicationMaster((String) parseHostPort.f0, i2, str);
        getContainersFromPreviousAttempts(registerApplicationMaster);
        updateMatchingStrategy(registerApplicationMaster);
        return createAMRMClientAsync;
    }

    private void getContainersFromPreviousAttempts(RegisterApplicationMasterResponse registerApplicationMasterResponse) {
        List<Container> containersFromPreviousAttempts = this.registerApplicationMasterResponseReflector.getContainersFromPreviousAttempts(registerApplicationMasterResponse);
        this.log.info("Recovered {} containers from previous attempts ({}).", Integer.valueOf(containersFromPreviousAttempts.size()), containersFromPreviousAttempts);
        for (Container container : containersFromPreviousAttempts) {
            this.workerNodeMap.put(new ResourceID(container.getId().toString()), new YarnWorkerNode(container));
        }
    }

    private void updateMatchingStrategy(RegisterApplicationMasterResponse registerApplicationMasterResponse) {
        Optional<Set<String>> schedulerResourceTypeNames = this.registerApplicationMasterResponseReflector.getSchedulerResourceTypeNames(registerApplicationMasterResponse);
        if (schedulerResourceTypeNames.isPresent()) {
            Set<String> set = schedulerResourceTypeNames.get();
            this.log.info("Register application master response contains scheduler resource types: {}.", set);
            this.matchingStrategy = set.contains("CPU") ? WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE : WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
        } else {
            this.log.info("Register application master response does not contain scheduler resource types, use '{}'.", YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES.key());
        }
        this.log.info("Container matching strategy: {}.", this.matchingStrategy);
    }

    protected NMClientAsync createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
        NMClientAsync createNMClientAsync = NMClientAsync.createNMClientAsync(this);
        createNMClientAsync.init(yarnConfiguration);
        createNMClientAsync.start();
        return createNMClientAsync;
    }

    protected Configuration loadClientConfiguration() {
        return GlobalConfiguration.loadConfiguration((String) this.env.get(ApplicationConstants.Environment.PWD.key()));
    }

    protected void initialize() throws ResourceManagerException {
        try {
            this.resourceManagerClient = createAndStartResourceManagerClient(this.yarnConfig, this.yarnHeartbeatIntervalMillis, this.webInterfaceUrl);
            this.nodeManagerClient = createAndStartNodeManagerClient(this.yarnConfig);
        } catch (Exception e) {
            throw new ResourceManagerException("Could not start resource manager client.", e);
        }
    }

    public CompletableFuture<Void> onStop() {
        Throwable th = null;
        if (this.resourceManagerClient != null) {
            try {
                this.resourceManagerClient.stop();
            } catch (Throwable th2) {
                th = th2;
            }
        }
        if (this.nodeManagerClient != null) {
            try {
                this.nodeManagerClient.stop();
            } catch (Throwable th3) {
                th = ExceptionUtils.firstOrSuppressed(th3, th);
            }
        }
        return getStopTerminationFutureOrCompletedExceptionally(th);
    }

    protected void internalDeregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) {
        FinalApplicationStatus yarnStatus = getYarnStatus(applicationStatus);
        this.log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);
        try {
            this.resourceManagerClient.unregisterApplicationMaster(yarnStatus, str, (String) HistoryServerUtils.getHistoryServerURL(this.flinkConfig).map((v0) -> {
                return v0.toString();
            }).orElse(""));
        } catch (Throwable th) {
            this.log.error("Could not unregister the application master.", th);
        }
        Utils.deleteApplicationFiles(this.env);
    }

    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
        return requestYarnContainer(workerResourceSpec);
    }

    @VisibleForTesting
    Optional<Resource> getContainerResource(WorkerResourceSpec workerResourceSpec) {
        return this.workerSpecContainerResourceAdapter.tryComputeContainerResource(workerResourceSpec);
    }

    public boolean stopWorker(YarnWorkerNode yarnWorkerNode) {
        Container container = yarnWorkerNode.getContainer();
        this.log.info("Stopping container {}.", container.getId());
        this.nodeManagerClient.stopContainerAsync(container.getId(), container.getNodeId());
        this.resourceManagerClient.releaseAssignedContainer(container.getId());
        this.workerNodeMap.remove(yarnWorkerNode.getResourceID());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: workerStarted, reason: merged with bridge method [inline-methods] */
    public YarnWorkerNode m13workerStarted(ResourceID resourceID) {
        return this.workerNodeMap.get(resourceID);
    }

    public float getProgress() {
        return 1.0f;
    }

    public void onContainersCompleted(List<ContainerStatus> list) {
        runAsync(() -> {
            this.log.debug("YARN ResourceManager reported the following containers completed: {}.", list);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ContainerStatus containerStatus = (ContainerStatus) it.next();
                ResourceID resourceID = new ResourceID(containerStatus.getContainerId().toString());
                YarnWorkerNode remove = this.workerNodeMap.remove(resourceID);
                notifyAllocatedWorkerStopped(resourceID);
                if (remove != null) {
                    requestYarnContainerIfRequired();
                }
                closeTaskManagerConnection(resourceID, new Exception(containerStatus.getDiagnostics()));
            }
        });
    }

    public void onContainersAllocated(List<Container> list) {
        runAsync(() -> {
            this.log.info("Received {} containers.", Integer.valueOf(list.size()));
            for (Map.Entry<Resource, List<Container>> entry : groupContainerByResource(list).entrySet()) {
                onContainersOfResourceAllocated(entry.getKey(), entry.getValue());
            }
            if (getNumRequestedNotAllocatedWorkers() <= 0) {
                this.resourceManagerClient.setHeartbeatInterval(this.yarnHeartbeatIntervalMillis);
            }
        });
    }

    private Map<Resource, List<Container>> groupContainerByResource(List<Container> list) {
        return (Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getResource();
        }));
    }

    private void onContainersOfResourceAllocated(Resource resource, List<Container> list) {
        List list2 = (List) this.workerSpecContainerResourceAdapter.getWorkerSpecs(resource, this.matchingStrategy).stream().flatMap(workerResourceSpec -> {
            return Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(workerResourceSpec), workerResourceSpec).stream();
        }).collect(Collectors.toList());
        int size = list2.size();
        this.log.info("Received {} containers with resource {}, {} pending container requests.", new Object[]{Integer.valueOf(list.size()), resource, Integer.valueOf(size)});
        Iterator<Container> it = list.iterator();
        Iterator it2 = list2.iterator();
        Iterator<AMRMClient.ContainerRequest> it3 = getPendingRequestsAndCheckConsistency(resource, list2.size()).iterator();
        int i = 0;
        while (it.hasNext() && it2.hasNext()) {
            WorkerResourceSpec workerResourceSpec2 = (WorkerResourceSpec) it2.next();
            Container next = it.next();
            AMRMClient.ContainerRequest next2 = it3.next();
            ResourceID containerResourceId = getContainerResourceId(next);
            notifyNewWorkerAllocated(workerResourceSpec2, containerResourceId);
            startTaskExecutorInContainer(next, workerResourceSpec2, containerResourceId);
            removeContainerRequest(next2, workerResourceSpec2);
            i++;
        }
        int i2 = size - i;
        int i3 = 0;
        while (it.hasNext()) {
            returnExcessContainer(it.next());
            i3++;
        }
        this.log.info("Accepted {} requested containers, returned {} excess containers, {} pending container requests of resource {}.", new Object[]{Integer.valueOf(i), Integer.valueOf(i3), Integer.valueOf(i2), resource});
    }

    private static ResourceID getContainerResourceId(Container container) {
        return new ResourceID(container.getId().toString());
    }

    private void startTaskExecutorInContainer(Container container, WorkerResourceSpec workerResourceSpec, ResourceID resourceID) {
        this.workerNodeMap.put(resourceID, new YarnWorkerNode(container));
        try {
            this.nodeManagerClient.startContainerAsync(container, createTaskExecutorLaunchContext(resourceID.toString(), container.getNodeId().getHost(), TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, workerResourceSpec)));
        } catch (Throwable th) {
            releaseFailedContainerAndRequestNewContainerIfRequired(container.getId(), th);
        }
    }

    private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId containerId, Throwable th) {
        validateRunsInMainThread();
        this.log.error("Could not start TaskManager in container {}.", containerId, th);
        ResourceID resourceID = new ResourceID(containerId.toString());
        this.workerNodeMap.remove(resourceID);
        this.resourceManagerClient.releaseAssignedContainer(containerId);
        notifyAllocatedWorkerStopped(resourceID);
        requestYarnContainerIfRequired();
    }

    private void returnExcessContainer(Container container) {
        this.log.info("Returning excess container {}.", container.getId());
        this.resourceManagerClient.releaseAssignedContainer(container.getId());
    }

    private void removeContainerRequest(AMRMClient.ContainerRequest containerRequest, WorkerResourceSpec workerResourceSpec) {
        this.log.info("Removing container request {}.", containerRequest);
        this.resourceManagerClient.removeContainerRequest(containerRequest);
    }

    private Collection<AMRMClient.ContainerRequest> getPendingRequestsAndCheckConsistency(Resource resource, int i) {
        List list = (List) this.workerSpecContainerResourceAdapter.getEquivalentContainerResource(resource, this.matchingStrategy).stream().flatMap(resource2 -> {
            return this.resourceManagerClient.getMatchingRequests(RM_REQUEST_PRIORITY, "*", resource2).stream();
        }).collect(Collectors.toList());
        Collection emptyList = list.isEmpty() ? Collections.emptyList() : new ArrayList((Collection) list.get(0));
        Preconditions.checkState(emptyList.size() == i, "The RMClient's and YarnResourceManagers internal state about the number of pending container requests for resource %s has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", new Object[]{resource, Integer.valueOf(emptyList.size()), Integer.valueOf(i)});
        return emptyList;
    }

    public void onShutdownRequest() {
        onFatalError(new ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
    }

    public void onNodesUpdated(List<NodeReport> list) {
    }

    public void onError(Throwable th) {
        onFatalError(th);
    }

    public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) {
        this.log.debug("Succeeded to call YARN Node Manager to start container {}.", containerId);
    }

    public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
    }

    public void onContainerStopped(ContainerId containerId) {
        this.log.debug("Succeeded to call YARN Node Manager to stop container {}.", containerId);
    }

    public void onStartContainerError(ContainerId containerId, Throwable th) {
        runAsync(() -> {
            releaseFailedContainerAndRequestNewContainerIfRequired(containerId, th);
        });
    }

    public void onGetContainerStatusError(ContainerId containerId, Throwable th) {
    }

    public void onStopContainerError(ContainerId containerId, Throwable th) {
        this.log.warn("Error while calling YARN Node Manager to stop container {}.", containerId, th);
    }

    private FinalApplicationStatus getYarnStatus(ApplicationStatus applicationStatus) {
        if (applicationStatus == null) {
            return FinalApplicationStatus.UNDEFINED;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[applicationStatus.ordinal()]) {
            case 1:
                return FinalApplicationStatus.SUCCEEDED;
            case 2:
                return FinalApplicationStatus.FAILED;
            case 3:
                return FinalApplicationStatus.KILLED;
            default:
                return FinalApplicationStatus.UNDEFINED;
        }
    }

    private static Tuple2<String, Integer> parseHostPort(String str) {
        String[] split = str.split("@")[1].split(":");
        return new Tuple2<>(split[0], Integer.valueOf(split[1].split("/")[0]));
    }

    private void requestYarnContainerIfRequired() {
        for (Map.Entry entry : getRequiredResources().entrySet()) {
            WorkerResourceSpec workerResourceSpec = (WorkerResourceSpec) entry.getKey();
            while (((Integer) entry.getValue()).intValue() > getNumRequestedNotRegisteredWorkersFor(workerResourceSpec)) {
                Preconditions.checkState(requestYarnContainer(workerResourceSpec), "Cannot request container for worker resource spec {}.", new Object[]{workerResourceSpec});
            }
        }
    }

    private boolean requestYarnContainer(WorkerResourceSpec workerResourceSpec) {
        Optional<Resource> containerResource = getContainerResource(workerResourceSpec);
        if (!containerResource.isPresent()) {
            return false;
        }
        this.resourceManagerClient.addContainerRequest(getContainerRequest(containerResource.get()));
        this.resourceManagerClient.setHeartbeatInterval(this.containerRequestHeartbeatIntervalMillis);
        this.log.info("Requesting new TaskExecutor container with resource {}. Number pending workers of this resource is {}.", workerResourceSpec, Integer.valueOf(notifyNewWorkerRequested(workerResourceSpec).getNumNotAllocated()));
        return true;
    }

    @VisibleForTesting
    @Nonnull
    AMRMClient.ContainerRequest getContainerRequest(Resource resource) {
        return new AMRMClient.ContainerRequest(resource, (String[]) null, (String[]) null, RM_REQUEST_PRIORITY);
    }

    private ContainerLaunchContext createTaskExecutorLaunchContext(String str, String str2, TaskExecutorProcessSpec taskExecutorProcessSpec) throws Exception {
        String str3 = (String) this.env.get(ApplicationConstants.Environment.PWD.key());
        ContaineredTaskManagerParameters create = ContaineredTaskManagerParameters.create(this.flinkConfig, taskExecutorProcessSpec);
        this.log.info("TaskExecutor {} will be started on {} with {}.", new Object[]{str, str2, taskExecutorProcessSpec});
        Configuration cloneConfiguration = BootstrapTools.cloneConfiguration(this.flinkConfig);
        String dynamicPropertiesAsString = BootstrapTools.getDynamicPropertiesAsString(this.flinkClientConfig, cloneConfiguration);
        this.log.debug("TaskManager configuration: {}", cloneConfiguration);
        ContainerLaunchContext createTaskExecutorContext = Utils.createTaskExecutorContext(this.flinkConfig, this.yarnConfig, this.env, create, dynamicPropertiesAsString, str3, YarnTaskExecutorRunner.class, this.log);
        createTaskExecutorContext.getEnvironment().put(ENV_FLINK_CONTAINER_ID, str);
        createTaskExecutorContext.getEnvironment().put(ENV_FLINK_NODE_ID, str2);
        return createTaskExecutorContext;
    }
}
