/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.LogManager;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.Logger;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.node.DiscoveryNode;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.settings.Setting;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.settings.Settings;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.ThreadPool;

public class LagDetector {
    private static final Logger logger = LogManager.getLogger(LagDetector.class);
    public static final Setting<TimeValue> CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING = Setting.timeSetting("cluster.follower_lag.timeout", TimeValue.timeValueMillis(90000L), TimeValue.timeValueMillis(1L), Setting.Property.NodeScope);
    private final TimeValue clusterStateApplicationTimeout;
    private final Consumer<DiscoveryNode> onLagDetected;
    private final Supplier<DiscoveryNode> localNodeSupplier;
    private final ThreadPool threadPool;
    private final Map<DiscoveryNode, NodeAppliedStateTracker> appliedStateTrackersByNode = ConcurrentCollections.newConcurrentMap();

    public LagDetector(Settings settings, ThreadPool threadPool, Consumer<DiscoveryNode> onLagDetected, Supplier<DiscoveryNode> localNodeSupplier) {
        this.threadPool = threadPool;
        this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings);
        this.onLagDetected = onLagDetected;
        this.localNodeSupplier = localNodeSupplier;
    }

    public void setTrackedNodes(Iterable<DiscoveryNode> discoveryNodes) {
        HashSet discoveryNodeSet = new HashSet();
        discoveryNodes.forEach(discoveryNodeSet::add);
        discoveryNodeSet.remove(this.localNodeSupplier.get());
        this.appliedStateTrackersByNode.keySet().retainAll(discoveryNodeSet);
        discoveryNodeSet.forEach(node -> this.appliedStateTrackersByNode.putIfAbsent((DiscoveryNode)node, new NodeAppliedStateTracker((DiscoveryNode)node)));
    }

    public void clearTrackedNodes() {
        this.appliedStateTrackersByNode.clear();
    }

    public void setAppliedVersion(DiscoveryNode discoveryNode, long appliedVersion) {
        NodeAppliedStateTracker nodeAppliedStateTracker = this.appliedStateTrackersByNode.get(discoveryNode);
        if (nodeAppliedStateTracker == null) {
            logger.trace("node {} applied version {} but this node's version is not being tracked", (Object)discoveryNode, (Object)appliedVersion);
        } else {
            nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion);
        }
    }

    public void startLagDetector(final long version) {
        final List laggingTrackers = this.appliedStateTrackersByNode.values().stream().filter(t -> t.appliedVersionLessThan(version)).collect(Collectors.toList());
        if (laggingTrackers.isEmpty()) {
            logger.trace("lag detection for version {} is unnecessary: {}", (Object)version, (Object)this.appliedStateTrackersByNode.values());
        } else {
            logger.debug("starting lag detector for version {}: {}", (Object)version, (Object)laggingTrackers);
            this.threadPool.scheduleUnlessShuttingDown(this.clusterStateApplicationTimeout, "generic", new Runnable(){

                @Override
                public void run() {
                    laggingTrackers.forEach(t -> t.checkForLag(version));
                }

                public String toString() {
                    return "lag detector for version " + version + " on " + laggingTrackers;
                }
            });
        }
    }

    public String toString() {
        return "LagDetector{clusterStateApplicationTimeout=" + this.clusterStateApplicationTimeout + ", appliedStateTrackersByNode=" + this.appliedStateTrackersByNode.values() + '}';
    }

    Set<DiscoveryNode> getTrackedNodes() {
        return Collections.unmodifiableSet(this.appliedStateTrackersByNode.keySet());
    }

    private class NodeAppliedStateTracker {
        private final DiscoveryNode discoveryNode;
        private final AtomicLong appliedVersion = new AtomicLong();

        NodeAppliedStateTracker(DiscoveryNode discoveryNode) {
            this.discoveryNode = discoveryNode;
        }

        void increaseAppliedVersion(long appliedVersion) {
            long maxAppliedVersion = this.appliedVersion.updateAndGet(v -> Math.max(v, appliedVersion));
            logger.trace("{} applied version {}, max now {}", (Object)this, (Object)appliedVersion, (Object)maxAppliedVersion);
        }

        boolean appliedVersionLessThan(long version) {
            return this.appliedVersion.get() < version;
        }

        public String toString() {
            return "NodeAppliedStateTracker{discoveryNode=" + this.discoveryNode + ", appliedVersion=" + this.appliedVersion + '}';
        }

        void checkForLag(long version) {
            if (LagDetector.this.appliedStateTrackersByNode.get(this.discoveryNode) != this) {
                logger.trace("{} no longer active when checking version {}", (Object)this, (Object)version);
                return;
            }
            long appliedVersion = this.appliedVersion.get();
            if (version <= appliedVersion) {
                logger.trace("{} satisfied when checking version {}, node applied version {}", (Object)this, (Object)version, (Object)appliedVersion);
                return;
            }
            logger.debug("{}, detected lag at version {}, node has only applied version {}", (Object)this, (Object)version, (Object)appliedVersion);
            LagDetector.this.onLagDetected.accept(this.discoveryNode);
        }
    }
}

