package org.apache.flink.kubernetes.highavailability;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler;
import org.apache.flink.runtime.leaderelection.LeaderElectionException;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.class */
public class KubernetesLeaderElectionDriver implements LeaderElectionDriver {
    private static final Logger LOG;
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final String lockIdentity;
    private final KubernetesLeaderElector leaderElector;
    private final Map<String, String> configMapLabels;
    private final LeaderElectionEventHandler leaderElectionEventHandler;
    private final FatalErrorHandler fatalErrorHandler;
    private volatile boolean running;

    @GuardedBy("watchLock")
    private KubernetesWatch kubernetesWatch;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object watchLock = new Object();
    private final CountDownLatch configMapLatch = new CountDownLatch(1);

    /* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver$ConfigMapCallbackHandlerImpl.class */
    private class ConfigMapCallbackHandlerImpl implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
        private ConfigMapCallbackHandlerImpl() {
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onAdded(List<KubernetesConfigMap> list) {
            KubernetesLeaderElectionDriver.this.configMapLatch.countDown();
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onModified(List<KubernetesConfigMap> list) {
            KubernetesConfigMap checkConfigMaps = KubernetesUtils.checkConfigMaps(list, KubernetesLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(checkConfigMaps, KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.leaderElectionEventHandler.onLeaderInformationChange(KubernetesUtils.getLeaderInformationFromConfigMap(checkConfigMaps));
            }
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onDeleted(List<KubernetesConfigMap> list) {
            if (KubernetesLeaderElector.hasLeadership(KubernetesUtils.checkConfigMaps(list, KubernetesLeaderElectionDriver.this.configMapName), KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.fatalErrorHandler.onFatalError(new LeaderElectionException("ConfigMap " + KubernetesLeaderElectionDriver.this.configMapName + " is deleted externally"));
            }
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void onError(List<KubernetesConfigMap> list) {
            if (KubernetesLeaderElector.hasLeadership(KubernetesUtils.checkConfigMaps(list, KubernetesLeaderElectionDriver.this.configMapName), KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.fatalErrorHandler.onFatalError(new LeaderElectionException("Error while watching the ConfigMap " + KubernetesLeaderElectionDriver.this.configMapName));
            }
        }

        @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient.WatchCallbackHandler
        public void handleError(Throwable th) {
            if (!(th instanceof KubernetesTooOldResourceVersionException)) {
                KubernetesLeaderElectionDriver.this.fatalErrorHandler.onFatalError(new LeaderElectionException("Error while watching the ConfigMap " + KubernetesLeaderElectionDriver.this.configMapName, th));
                return;
            }
            synchronized (KubernetesLeaderElectionDriver.this.watchLock) {
                if (KubernetesLeaderElectionDriver.this.running) {
                    if (KubernetesLeaderElectionDriver.this.kubernetesWatch != null) {
                        KubernetesLeaderElectionDriver.this.kubernetesWatch.close();
                    }
                    KubernetesLeaderElectionDriver.LOG.info("Creating a new watch on ConfigMap {}.", KubernetesLeaderElectionDriver.this.configMapName);
                    KubernetesLeaderElectionDriver.this.kubernetesWatch = KubernetesLeaderElectionDriver.this.kubeClient.watchConfigMaps(KubernetesLeaderElectionDriver.this.configMapName, new ConfigMapCallbackHandlerImpl());
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver$LeaderCallbackHandlerImpl.class */
    private class LeaderCallbackHandlerImpl extends KubernetesLeaderElector.LeaderCallbackHandler {
        private LeaderCallbackHandlerImpl() {
        }

        @Override // org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector.LeaderCallbackHandler
        public void isLeader() {
            KubernetesLeaderElectionDriver.this.leaderElectionEventHandler.onGrantLeadership();
        }

        @Override // org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector.LeaderCallbackHandler
        public void notLeader() {
            KubernetesLeaderElectionDriver.this.leaderElectionEventHandler.onRevokeLeadership();
            KubernetesLeaderElectionDriver.this.leaderElector.run();
        }
    }

    public KubernetesLeaderElectionDriver(FlinkKubeClient flinkKubeClient, KubernetesLeaderElectionConfiguration kubernetesLeaderElectionConfiguration, LeaderElectionEventHandler leaderElectionEventHandler, FatalErrorHandler fatalErrorHandler) {
        this.kubeClient = (FlinkKubeClient) Preconditions.checkNotNull(flinkKubeClient, "Kubernetes client");
        Preconditions.checkNotNull(kubernetesLeaderElectionConfiguration, "Leader election configuration");
        this.leaderElectionEventHandler = (LeaderElectionEventHandler) Preconditions.checkNotNull(leaderElectionEventHandler, "LeaderElectionEventHandler");
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.configMapName = kubernetesLeaderElectionConfiguration.getConfigMapName();
        this.lockIdentity = kubernetesLeaderElectionConfiguration.getLockIdentity();
        this.leaderElector = flinkKubeClient.createLeaderElector(kubernetesLeaderElectionConfiguration, new LeaderCallbackHandlerImpl());
        this.configMapLabels = KubernetesUtils.getConfigMapLabels(kubernetesLeaderElectionConfiguration.getClusterId(), Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
        this.running = true;
        this.leaderElector.run();
        this.kubernetesWatch = flinkKubeClient.watchConfigMaps(this.configMapName, new ConfigMapCallbackHandlerImpl());
        try {
            this.configMapLatch.await();
        } catch (InterruptedException e) {
            throw new FlinkRuntimeException("Interrupted while waiting for leader ConfigMap to be created.", e);
        }
    }

    public void close() {
        if (this.running) {
            this.running = false;
            LOG.info("Closing {}.", this);
            this.leaderElector.stop();
            synchronized (this.watchLock) {
                if (this.kubernetesWatch != null) {
                    this.kubernetesWatch.close();
                }
            }
        }
    }

    public void writeLeaderInformation(LeaderInformation leaderInformation) {
        if (!$assertionsDisabled && !this.running) {
            throw new AssertionError();
        }
        UUID leaderSessionID = leaderInformation.getLeaderSessionID();
        String leaderAddress = leaderInformation.getLeaderAddress();
        try {
            this.kubeClient.checkAndUpdateConfigMap(this.configMapName, kubernetesConfigMap -> {
                if (!KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity)) {
                    return Optional.empty();
                }
                if (leaderAddress == null) {
                    kubernetesConfigMap.getData().remove(Constants.LEADER_ADDRESS_KEY);
                } else {
                    kubernetesConfigMap.getData().put(Constants.LEADER_ADDRESS_KEY, leaderAddress);
                }
                if (leaderSessionID == null) {
                    kubernetesConfigMap.getData().remove(Constants.LEADER_SESSION_ID_KEY);
                } else {
                    kubernetesConfigMap.getData().put(Constants.LEADER_SESSION_ID_KEY, leaderSessionID.toString());
                }
                kubernetesConfigMap.getLabels().putAll(this.configMapLabels);
                return Optional.of(kubernetesConfigMap);
            }).get();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully wrote leader information: Leader={}, session ID={}.", leaderAddress, leaderSessionID);
            }
        } catch (Exception e) {
            this.fatalErrorHandler.onFatalError(new KubernetesException("Could not write leader information since ConfigMap " + this.configMapName + " does not exist.", e));
        }
    }

    public boolean hasLeadership() {
        if (!$assertionsDisabled && !this.running) {
            throw new AssertionError();
        }
        Optional<KubernetesConfigMap> configMap = this.kubeClient.getConfigMap(this.configMapName);
        if (configMap.isPresent()) {
            return KubernetesLeaderElector.hasLeadership(configMap.get(), this.lockIdentity);
        }
        this.fatalErrorHandler.onFatalError(new KubernetesException("ConfigMap " + this.configMapName + " does not exist.", null));
        return false;
    }

    public String toString() {
        return "KubernetesLeaderElectionDriver{configMapName='" + this.configMapName + "'}";
    }

    static {
        $assertionsDisabled = !KubernetesLeaderElectionDriver.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(KubernetesLeaderElectionDriver.class);
    }
}
