package org.apache.flink.kubernetes.highavailability;

import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.AbstractHaServices;
import org.apache.flink.runtime.highavailability.FileSystemJobResultStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionHaServices.class */
public class KubernetesLeaderElectionHaServices extends AbstractHaServices {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElectionHaServices.class);
    private final String clusterId;
    private final FlinkKubeClient kubeClient;
    private final KubernetesConfigMapSharedWatcher configMapSharedWatcher;
    private final ExecutorService watchExecutorService;
    private final String lockIdentity;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KubernetesLeaderElectionHaServices(FlinkKubeClient flinkKubeClient, Executor executor, Configuration configuration, BlobStoreService blobStoreService) throws Exception {
        this(flinkKubeClient, flinkKubeClient.createConfigMapSharedWatcher(KubernetesUtils.getConfigMapLabels((String) configuration.get(KubernetesConfigOptions.CLUSTER_ID), Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)), Executors.newCachedThreadPool(new ExecutorThreadFactory("config-map-watch-handler")), executor, (String) configuration.get(KubernetesConfigOptions.CLUSTER_ID), UUID.randomUUID().toString(), configuration, blobStoreService);
    }

    private KubernetesLeaderElectionHaServices(FlinkKubeClient flinkKubeClient, KubernetesConfigMapSharedWatcher kubernetesConfigMapSharedWatcher, ExecutorService executorService, Executor executor, String str, String str2, Configuration configuration, BlobStoreService blobStoreService) throws Exception {
        super(configuration, createDriverFactory(flinkKubeClient, kubernetesConfigMapSharedWatcher, executorService, str, str2, configuration), executor, blobStoreService, FileSystemJobResultStore.fromConfiguration(configuration));
        this.kubeClient = (FlinkKubeClient) Preconditions.checkNotNull(flinkKubeClient);
        this.clusterId = (String) Preconditions.checkNotNull(str);
        this.configMapSharedWatcher = (KubernetesConfigMapSharedWatcher) Preconditions.checkNotNull(kubernetesConfigMapSharedWatcher);
        this.watchExecutorService = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.lockIdentity = (String) Preconditions.checkNotNull(str2);
    }

    private static LeaderElectionDriverFactory createDriverFactory(FlinkKubeClient flinkKubeClient, KubernetesConfigMapSharedWatcher kubernetesConfigMapSharedWatcher, Executor executor, String str, String str2, Configuration configuration) {
        return new KubernetesLeaderElectionDriverFactory(flinkKubeClient, new KubernetesLeaderElectionConfiguration(getClusterConfigMap(str), str2, configuration), kubernetesConfigMapSharedWatcher, executor);
    }

    protected LeaderRetrievalService createLeaderRetrievalService(String str) {
        return new DefaultLeaderRetrievalService(new KubernetesLeaderRetrievalDriverFactory(this.configMapSharedWatcher, this.watchExecutorService, getClusterConfigMap(), str));
    }

    protected CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
        return KubernetesCheckpointRecoveryFactory.withoutLeadershipValidation(this.kubeClient, this.configuration, this.ioExecutor, this.clusterId, this::getJobSpecificConfigMap);
    }

    private String getJobSpecificConfigMap(JobID jobID) {
        return this.clusterId + Constants.NAME_SEPARATOR + jobID.toString() + Constants.NAME_SEPARATOR + "config-map";
    }

    protected JobGraphStore createJobGraphStore() throws Exception {
        return KubernetesUtils.createJobGraphStore(this.configuration, this.kubeClient, getClusterConfigMap(), this.lockIdentity);
    }

    private String getClusterConfigMap() {
        return getClusterConfigMap(this.clusterId);
    }

    private static String getClusterConfigMap(String str) {
        return str + Constants.NAME_SEPARATOR + "cluster-config-map";
    }

    public void internalClose() throws Exception {
        Exception exc = null;
        try {
            closeK8sServices();
        } catch (Exception e) {
            exc = e;
        }
        this.kubeClient.close();
        ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.watchExecutorService});
        ExceptionUtils.tryRethrowException(exc);
    }

    private void closeK8sServices() {
        this.configMapSharedWatcher.close();
        int size = this.watchExecutorService.shutdownNow().size();
        if (size != 0) {
            LOG.debug("The k8s HA services were closed with {} event(s) still not being processed. No further action necessary.", Integer.valueOf(size));
        }
    }

    public void internalCleanup() throws Exception {
        Exception exc = null;
        try {
            closeK8sServices();
        } catch (Exception e) {
            exc = e;
        }
        this.kubeClient.deleteConfigMapsByLabels(KubernetesUtils.getConfigMapLabels(this.clusterId, Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)).get();
        ExceptionUtils.tryRethrowException(exc);
    }

    public void internalCleanupJobData(JobID jobID) throws Exception {
        this.kubeClient.deleteConfigMap(getJobSpecificConfigMap(jobID)).get();
    }

    protected String getLeaderPathForResourceManager() {
        return "resourcemanager";
    }

    protected String getLeaderPathForDispatcher() {
        return "dispatcher";
    }

    protected String getLeaderPathForJobManager(JobID jobID) {
        return jobID.toString();
    }

    protected String getLeaderPathForRestServer() {
        return "restserver";
    }
}
