/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.highavailability;

import java.util.List;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalDriver;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesLeaderRetrievalDriver
implements LeaderRetrievalDriver {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderRetrievalDriver.class);
    private final Object watchLock = new Object();
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final LeaderRetrievalEventHandler leaderRetrievalEventHandler;
    private final FatalErrorHandler fatalErrorHandler;
    private volatile boolean running;
    @GuardedBy(value="watchLock")
    private KubernetesWatch kubernetesWatch;

    public KubernetesLeaderRetrievalDriver(FlinkKubeClient kubeClient, String configMapName, LeaderRetrievalEventHandler leaderRetrievalEventHandler, FatalErrorHandler fatalErrorHandler) {
        this.kubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)kubeClient, (String)"Kubernetes client");
        this.configMapName = (String)Preconditions.checkNotNull((Object)configMapName, (String)"ConfigMap name");
        this.leaderRetrievalEventHandler = (LeaderRetrievalEventHandler)Preconditions.checkNotNull((Object)leaderRetrievalEventHandler, (String)"LeaderRetrievalEventHandler");
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        this.kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new ConfigMapCallbackHandlerImpl());
        this.running = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        if (!this.running) {
            return;
        }
        this.running = false;
        LOG.info("Stopping {}.", (Object)this);
        Object object = this.watchLock;
        synchronized (object) {
            if (this.kubernetesWatch != null) {
                this.kubernetesWatch.close();
            }
        }
    }

    public String toString() {
        return "KubernetesLeaderRetrievalDriver{configMapName='" + this.configMapName + "'}";
    }

    private class ConfigMapCallbackHandlerImpl
    implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
        private ConfigMapCallbackHandlerImpl() {
        }

        @Override
        public void onAdded(List<KubernetesConfigMap> configMaps) {
        }

        @Override
        public void onModified(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.checkConfigMaps(configMaps, KubernetesLeaderRetrievalDriver.this.configMapName);
            KubernetesLeaderRetrievalDriver.this.leaderRetrievalEventHandler.notifyLeaderAddress(KubernetesUtils.getLeaderInformationFromConfigMap(configMap));
        }

        @Override
        public void onDeleted(List<KubernetesConfigMap> configMaps) {
        }

        @Override
        public void onError(List<KubernetesConfigMap> configMaps) {
            KubernetesLeaderRetrievalDriver.this.fatalErrorHandler.onFatalError((Throwable)new LeaderRetrievalException("Error while watching the ConfigMap " + KubernetesLeaderRetrievalDriver.this.configMapName));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleError(Throwable throwable) {
            if (throwable instanceof KubernetesTooOldResourceVersionException) {
                Object object = KubernetesLeaderRetrievalDriver.this.watchLock;
                synchronized (object) {
                    if (KubernetesLeaderRetrievalDriver.this.running) {
                        if (KubernetesLeaderRetrievalDriver.this.kubernetesWatch != null) {
                            KubernetesLeaderRetrievalDriver.this.kubernetesWatch.close();
                        }
                        LOG.info("Creating a new watch on ConfigMap {}.", (Object)KubernetesLeaderRetrievalDriver.this.configMapName);
                        KubernetesLeaderRetrievalDriver.this.kubernetesWatch = KubernetesLeaderRetrievalDriver.this.kubeClient.watchConfigMaps(KubernetesLeaderRetrievalDriver.this.configMapName, new ConfigMapCallbackHandlerImpl());
                    }
                }
            } else {
                KubernetesLeaderRetrievalDriver.this.fatalErrorHandler.onFatalError((Throwable)new LeaderRetrievalException("Error while watching the ConfigMap " + KubernetesLeaderRetrievalDriver.this.configMapName, throwable));
            }
        }
    }
}

