package org.apache.flink.kubernetes;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesResourceManagerConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.shadded.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/KubernetesResourceManager.class */
public class KubernetesResourceManager extends ActiveResourceManager<KubernetesWorkerNode> implements FlinkKubeClient.PodCallbackHandler {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class);
    private static final String TASK_MANAGER_POD_FORMAT = "%s-taskmanager-%d-%d";
    private final Map<ResourceID, KubernetesWorkerNode> workerNodes;
    private final double defaultCpus;
    private long currentMaxAttemptId;
    private long currentMaxPodId;
    private final String clusterId;
    private final FlinkKubeClient kubeClient;
    private final ContaineredTaskManagerParameters taskManagerParameters;
    private final List<String> taskManagerStartCommand;
    private final KubernetesResourceManagerConfiguration configuration;
    private int numPendingPodRequests;

    public KubernetesResourceManager(RpcService rpcService, String str, ResourceID resourceID, Configuration configuration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, ResourceManagerMetricGroup resourceManagerMetricGroup, FlinkKubeClient flinkKubeClient, KubernetesResourceManagerConfiguration kubernetesResourceManagerConfiguration) {
        super(configuration, System.getenv(), rpcService, str, resourceID, highAvailabilityServices, heartbeatServices, slotManager, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup);
        this.workerNodes = new HashMap();
        this.currentMaxAttemptId = 0L;
        this.currentMaxPodId = 0L;
        this.numPendingPodRequests = 0;
        this.clusterId = kubernetesResourceManagerConfiguration.getClusterId();
        this.defaultCpus = this.taskExecutorProcessSpec.getCpuCores().getValue().doubleValue();
        this.kubeClient = flinkKubeClient;
        this.taskManagerParameters = ContaineredTaskManagerParameters.create(configuration, this.taskExecutorProcessSpec, this.numSlotsPerTaskManager);
        this.taskManagerStartCommand = getTaskManagerStartCommand();
        this.configuration = kubernetesResourceManagerConfiguration;
    }

    protected Configuration loadClientConfiguration() {
        return GlobalConfiguration.loadConfiguration();
    }

    protected void initialize() throws ResourceManagerException {
        recoverWorkerNodesFromPreviousAttempts();
        this.kubeClient.watchPodsAndDoCallback(getTaskManagerLabels(), this);
    }

    public CompletableFuture<Void> onStop() {
        Throwable th = null;
        try {
            this.kubeClient.close();
        } catch (Throwable th2) {
            th = th2;
        }
        return getStopTerminationFutureOrCompletedExceptionally(th);
    }

    protected void internalDeregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) {
        LOG.info("Stopping kubernetes cluster, clusterId: {}, diagnostics: {}", this.clusterId, str == null ? JsonProperty.USE_DEFAULT_NAME : str);
        this.kubeClient.stopAndCleanupCluster(this.clusterId);
    }

    public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
        LOG.info("Starting new worker with resource profile, {}", resourceProfile);
        if (!((ResourceProfile) this.resourceProfilesPerWorker.iterator().next()).isMatching(resourceProfile)) {
            return Collections.emptyList();
        }
        requestKubernetesPod();
        return this.resourceProfilesPerWorker;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: workerStarted, reason: merged with bridge method [inline-methods] */
    public KubernetesWorkerNode m43workerStarted(ResourceID resourceID) {
        return this.workerNodes.get(resourceID);
    }

    public boolean stopWorker(KubernetesWorkerNode kubernetesWorkerNode) {
        LOG.info("Stopping Worker {}.", kubernetesWorkerNode.getResourceID());
        this.workerNodes.remove(kubernetesWorkerNode.getResourceID());
        internalStopPod(kubernetesWorkerNode.getResourceID().toString());
        return true;
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.PodCallbackHandler
    public void onAdded(List<KubernetesPod> list) {
        runAsync(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                KubernetesPod kubernetesPod = (KubernetesPod) it.next();
                if (this.numPendingPodRequests > 0) {
                    this.numPendingPodRequests--;
                    KubernetesWorkerNode kubernetesWorkerNode = new KubernetesWorkerNode(new ResourceID(kubernetesPod.getName()));
                    this.workerNodes.putIfAbsent(kubernetesWorkerNode.getResourceID(), kubernetesWorkerNode);
                }
                this.log.info("Received new TaskManager pod: {} - Remaining pending pod requests: {}", kubernetesPod.getName(), Integer.valueOf(this.numPendingPodRequests));
            }
        });
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.PodCallbackHandler
    public void onModified(List<KubernetesPod> list) {
        runAsync(() -> {
            list.forEach(this::removePodIfTerminated);
        });
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.PodCallbackHandler
    public void onDeleted(List<KubernetesPod> list) {
        runAsync(() -> {
            list.forEach(this::removePodIfTerminated);
        });
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.PodCallbackHandler
    public void onError(List<KubernetesPod> list) {
        runAsync(() -> {
            list.forEach(this::removePodIfTerminated);
        });
    }

    @VisibleForTesting
    Map<ResourceID, KubernetesWorkerNode> getWorkerNodes() {
        return this.workerNodes;
    }

    private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerException {
        Iterator<KubernetesPod> it = this.kubeClient.getPodsWithLabels(getTaskManagerLabels()).iterator();
        while (it.hasNext()) {
            KubernetesWorkerNode kubernetesWorkerNode = new KubernetesWorkerNode(new ResourceID(it.next().getName()));
            this.workerNodes.put(kubernetesWorkerNode.getResourceID(), kubernetesWorkerNode);
            long attempt = kubernetesWorkerNode.getAttempt();
            if (attempt > this.currentMaxAttemptId) {
                this.currentMaxAttemptId = attempt;
            }
        }
        Logger logger = this.log;
        Integer valueOf = Integer.valueOf(this.workerNodes.size());
        long j = this.currentMaxAttemptId + 1;
        this.currentMaxAttemptId = j;
        logger.info("Recovered {} pods from previous attempts, current attempt id is {}.", valueOf, Long.valueOf(j));
    }

    private void requestKubernetesPod() {
        this.numPendingPodRequests++;
        this.log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", new Object[]{Integer.valueOf(this.defaultMemoryMB), Double.valueOf(this.defaultCpus), Integer.valueOf(this.numPendingPodRequests)});
        long j = this.currentMaxPodId + 1;
        this.currentMaxPodId = j;
        String format = String.format(TASK_MANAGER_POD_FORMAT, this.clusterId, Long.valueOf(this.currentMaxAttemptId), Long.valueOf(j));
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.ENV_FLINK_POD_NAME, format);
        hashMap.putAll(this.taskManagerParameters.taskManagerEnv());
        TaskManagerPodParameter taskManagerPodParameter = new TaskManagerPodParameter(format, this.taskManagerStartCommand, this.defaultMemoryMB, this.defaultCpus, hashMap);
        this.log.info("TaskManager {} will be started with {}.", format, this.taskExecutorProcessSpec);
        this.kubeClient.createTaskManagerPod(taskManagerPodParameter).whenComplete((r7, th) -> {
            if (th != null) {
                this.log.error("Could not start TaskManager in pod {}.", format, th);
                scheduleRunAsync(this::decreasePendingAndRequestKubernetesPodIfRequired, this.configuration.getPodCreationRetryInterval());
            }
        });
    }

    private void decreasePendingAndRequestKubernetesPodIfRequired() {
        validateRunsInMainThread();
        this.numPendingPodRequests--;
        requestKubernetesPodIfRequired();
    }

    private void requestKubernetesPodIfRequired() {
        if (getNumberRequiredTaskManagerSlots() > this.numPendingPodRequests * this.numSlotsPerTaskManager) {
            requestKubernetesPod();
        }
    }

    private void removePodIfTerminated(KubernetesPod kubernetesPod) {
        if (kubernetesPod.isTerminated()) {
            internalStopPod(kubernetesPod.getName());
            if (this.workerNodes.remove(new ResourceID(kubernetesPod.getName())) != null) {
                requestKubernetesPodIfRequired();
            }
        }
    }

    private List<String> getTaskManagerStartCommand() {
        String string = this.flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR);
        return Arrays.asList("/bin/bash", "-c", KubernetesUtils.getTaskManagerStartCommand(this.flinkConfig, this.taskManagerParameters, string, this.flinkConfig.getString(KubernetesConfigOptions.FLINK_LOG_DIR), new File(string, Constants.CONFIG_FILE_LOGBACK_NAME).exists(), new File(string, Constants.CONFIG_FILE_LOG4J_NAME).exists(), KubernetesTaskExecutorRunner.class.getCanonicalName(), "--" + CommandLineOptions.CONFIG_DIR_OPTION.getLongOpt() + " " + this.flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR) + " " + BootstrapTools.getDynamicPropertiesAsString(this.flinkClientConfig, this.flinkConfig)));
    }

    private Map<String, String> getTaskManagerLabels() {
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE);
        hashMap.put(Constants.LABEL_APP_KEY, this.clusterId);
        hashMap.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_TASK_MANAGER);
        return hashMap;
    }

    protected double getCpuCores(Configuration configuration) {
        return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, KubernetesConfigOptions.TASK_MANAGER_CPU);
    }

    private void internalStopPod(String str) {
        this.kubeClient.stopPod(str).whenComplete((r7, th) -> {
            if (th != null) {
                this.log.error("Could not stop TaskManager in pod {}.", str, th);
            }
        });
    }
}
