package org.apache.flink.yarn;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

/* loaded from: input_file:org/apache/flink/yarn/YarnResourceManager.class */
public class YarnResourceManager extends ResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler {
    private final Map<String, String> env;
    private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
    private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
    private static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
    static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
    private final int yarnHeartbeatIntervalMillis;
    private final Configuration flinkConfig;
    private final YarnConfiguration yarnConfig;

    @Nullable
    private final String webInterfaceUrl;
    private final int numberOfTaskSlots;
    private final int defaultTaskManagerMemoryMB;
    private final int defaultCpus;
    private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
    private NMClient nodeManagerClient;
    private int numPendingContainerRequests;
    private final Map<ResourceProfile, Integer> resourcePriorities;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.yarn.YarnResourceManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/yarn/YarnResourceManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus = new int[ApplicationStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.SUCCEEDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[ApplicationStatus.CANCELED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public YarnResourceManager(RpcService rpcService, String str, ResourceID resourceID, Configuration configuration, Map<String, String> map, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, @Nullable String str2) {
        super(rpcService, str, resourceID, resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, slotManager, metricRegistry, jobLeaderIdService, clusterInformation, fatalErrorHandler);
        this.resourcePriorities = new HashMap();
        this.flinkConfig = configuration;
        this.yarnConfig = new YarnConfiguration();
        this.env = map;
        this.workerNodeMap = new ConcurrentHashMap();
        int integer = configuration.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
        long j = this.yarnConfig.getLong("yarn.am.liveness-monitor.expiry-interval-ms", 600000L);
        if (integer >= j) {
            this.log.warn("The heartbeat interval of the Flink Application master ({}) is greater than YARN's expiry interval ({}). The application is likely to be killed by YARN.", Integer.valueOf(integer), Long.valueOf(j));
        }
        this.yarnHeartbeatIntervalMillis = integer;
        this.numPendingContainerRequests = 0;
        this.webInterfaceUrl = str2;
        this.numberOfTaskSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
        this.defaultTaskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
        this.defaultCpus = configuration.getInteger(YarnConfigOptions.VCORES, this.numberOfTaskSlots);
    }

    protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(YarnConfiguration yarnConfiguration, int i, @Nullable String str) throws Exception {
        int i2;
        AMRMClientAsync<AMRMClient.ContainerRequest> createAMRMClientAsync = AMRMClientAsync.createAMRMClientAsync(i, this);
        createAMRMClientAsync.init(yarnConfiguration);
        createAMRMClientAsync.start();
        Tuple2<String, Integer> parseHostPort = parseHostPort(getAddress());
        if (str != null) {
            int lastIndexOf = str.lastIndexOf(58);
            i2 = lastIndexOf == -1 ? -1 : Integer.valueOf(str.substring(lastIndexOf + 1)).intValue();
        } else {
            i2 = -1;
        }
        getContainersFromPreviousAttempts(createAMRMClientAsync.registerApplicationMaster((String) parseHostPort.f0, i2, str));
        return createAMRMClientAsync;
    }

    private void getContainersFromPreviousAttempts(RegisterApplicationMasterResponse registerApplicationMasterResponse) {
        List<Container> containersFromPreviousAttempts = new RegisterApplicationMasterResponseReflector(this.log).getContainersFromPreviousAttempts(registerApplicationMasterResponse);
        this.log.info("Recovered {} containers from previous attempts ({}).", Integer.valueOf(containersFromPreviousAttempts.size()), containersFromPreviousAttempts);
        for (Container container : containersFromPreviousAttempts) {
            this.workerNodeMap.put(new ResourceID(container.getId().toString()), new YarnWorkerNode(container));
        }
    }

    protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
        NMClient createNMClient = NMClient.createNMClient();
        createNMClient.init(yarnConfiguration);
        createNMClient.start();
        createNMClient.cleanupRunningContainersOnStop(true);
        return createNMClient;
    }

    protected void initialize() throws ResourceManagerException {
        try {
            this.resourceManagerClient = createAndStartResourceManagerClient(this.yarnConfig, this.yarnHeartbeatIntervalMillis, this.webInterfaceUrl);
            this.nodeManagerClient = createAndStartNodeManagerClient(this.yarnConfig);
        } catch (Exception e) {
            throw new ResourceManagerException("Could not start resource manager client.", e);
        }
    }

    public CompletableFuture<Void> postStop() {
        Throwable th = null;
        if (this.resourceManagerClient != null) {
            try {
                this.resourceManagerClient.stop();
            } catch (Throwable th2) {
                th = th2;
            }
        }
        if (this.nodeManagerClient != null) {
            try {
                this.nodeManagerClient.stop();
            } catch (Throwable th3) {
                th = ExceptionUtils.firstOrSuppressed(th3, th);
            }
        }
        return th != null ? FutureUtils.completedExceptionally(new FlinkException("Error while shutting down YARN resource manager", th)) : super.postStop();
    }

    protected void internalDeregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) {
        FinalApplicationStatus yarnStatus = getYarnStatus(applicationStatus);
        this.log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);
        try {
            this.resourceManagerClient.unregisterApplicationMaster(yarnStatus, str, "");
        } catch (Throwable th) {
            this.log.error("Could not unregister the application master.", th);
        }
        Utils.deleteApplicationFiles(this.env);
    }

    public void startNewWorker(ResourceProfile resourceProfile) {
        requestYarnContainer(Resource.newInstance(resourceProfile.getMemoryInMB() < 0 ? this.defaultTaskManagerMemoryMB : resourceProfile.getMemoryInMB(), resourceProfile.getCpuCores() < 1.0d ? this.defaultCpus : (int) resourceProfile.getCpuCores()), Priority.newInstance(generatePriority(resourceProfile)));
    }

    public boolean stopWorker(YarnWorkerNode yarnWorkerNode) {
        Container container = yarnWorkerNode.getContainer();
        this.log.info("Stopping container {}.", container.getId());
        try {
            this.nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
        } catch (Exception e) {
            this.log.warn("Error while calling YARN Node Manager to stop container", e);
        }
        this.resourceManagerClient.releaseAssignedContainer(container.getId());
        this.workerNodeMap.remove(yarnWorkerNode.getResourceID());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: workerStarted, reason: merged with bridge method [inline-methods] */
    public YarnWorkerNode m45workerStarted(ResourceID resourceID) {
        return this.workerNodeMap.get(resourceID);
    }

    public float getProgress() {
        return 1.0f;
    }

    public void onContainersCompleted(List<ContainerStatus> list) {
        runAsync(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ContainerStatus containerStatus = (ContainerStatus) it.next();
                ResourceID resourceID = new ResourceID(containerStatus.getContainerId().toString());
                YarnWorkerNode remove = this.workerNodeMap.remove(resourceID);
                if (remove != null) {
                    internalRequestYarnContainer(remove.getContainer().getResource(), remove.getContainer().getPriority());
                    closeTaskManagerConnection(resourceID, new Exception(containerStatus.getDiagnostics()));
                }
            }
        });
    }

    public void onContainersAllocated(List<Container> list) {
        runAsync(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Container container = (Container) it.next();
                this.log.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), Integer.valueOf(this.numPendingContainerRequests));
                if (this.numPendingContainerRequests > 0) {
                    this.numPendingContainerRequests--;
                    String containerId = container.getId().toString();
                    ResourceID resourceID = new ResourceID(containerId);
                    this.workerNodeMap.put(resourceID, new YarnWorkerNode(container));
                    try {
                        this.nodeManagerClient.startContainer(container, createTaskExecutorLaunchContext(container.getResource(), containerId, container.getNodeId().getHost()));
                    } catch (Throwable th) {
                        this.log.error("Could not start TaskManager in container {}.", container.getId(), th);
                        this.workerNodeMap.remove(resourceID);
                        this.resourceManagerClient.releaseAssignedContainer(container.getId());
                        requestYarnContainer(container.getResource(), container.getPriority());
                    }
                } else {
                    this.log.info("Returning excess container {}.", container.getId());
                    this.resourceManagerClient.releaseAssignedContainer(container.getId());
                }
            }
            if (this.numPendingContainerRequests <= 0) {
                this.resourceManagerClient.setHeartbeatInterval(this.yarnHeartbeatIntervalMillis);
            }
        });
    }

    public void onShutdownRequest() {
        shutDown();
    }

    public void onNodesUpdated(List<NodeReport> list) {
    }

    public void onError(Throwable th) {
        onFatalError(th);
    }

    private FinalApplicationStatus getYarnStatus(ApplicationStatus applicationStatus) {
        if (applicationStatus == null) {
            return FinalApplicationStatus.UNDEFINED;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$clusterframework$ApplicationStatus[applicationStatus.ordinal()]) {
            case 1:
                return FinalApplicationStatus.SUCCEEDED;
            case 2:
                return FinalApplicationStatus.FAILED;
            case 3:
                return FinalApplicationStatus.KILLED;
            default:
                return FinalApplicationStatus.UNDEFINED;
        }
    }

    private static Tuple2<String, Integer> parseHostPort(String str) {
        String[] split = str.split("@")[1].split(":");
        return new Tuple2<>(split[0], Integer.valueOf(split[1].split("/")[0]));
    }

    private void requestYarnContainer(Resource resource, Priority priority) {
        this.resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, (String[]) null, (String[]) null, priority));
        this.resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
        this.numPendingContainerRequests++;
        this.log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", resource, Integer.valueOf(this.numPendingContainerRequests));
    }

    private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String str, String str2) throws Exception {
        String str3 = this.env.get(ApplicationConstants.Environment.PWD.key());
        ContaineredTaskManagerParameters create = ContaineredTaskManagerParameters.create(this.flinkConfig, resource.getMemory(), this.numberOfTaskSlots);
        this.log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, JVM direct memory limit {} MB", new Object[]{str, Long.valueOf(create.taskManagerTotalMemoryMB()), Long.valueOf(create.taskManagerHeapSizeMB()), Long.valueOf(create.taskManagerDirectMemoryLimitMB())});
        this.log.debug("TaskManager configuration: {}", this.flinkConfig);
        ContainerLaunchContext createTaskExecutorContext = Utils.createTaskExecutorContext(this.flinkConfig, this.yarnConfig, this.env, create, this.flinkConfig, str3, YarnTaskExecutorRunner.class, this.log);
        createTaskExecutorContext.getEnvironment().put(ENV_FLINK_CONTAINER_ID, str);
        createTaskExecutorContext.getEnvironment().put(ENV_FLINK_NODE_ID, str2);
        return createTaskExecutorContext;
    }

    private int generatePriority(ResourceProfile resourceProfile) {
        if (this.resourcePriorities.containsKey(resourceProfile)) {
            return this.resourcePriorities.get(resourceProfile).intValue();
        }
        int size = this.resourcePriorities.size();
        this.resourcePriorities.put(resourceProfile, Integer.valueOf(size));
        return size;
    }

    private void internalRequestYarnContainer(Resource resource, Priority priority) {
        if (getNumberPendingSlotRequests() > this.numPendingContainerRequests * this.numberOfTaskSlots) {
            requestYarnContainer(resource, priority);
        }
    }
}
