/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster;

import java.util.HashSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.node.DiscoveryNode;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.node.DiscoveryNodes;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.inject.Inject;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.lease.Releasable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.settings.Setting;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.settings.Settings;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.KeyedLock;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.threadpool.Scheduler;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.threadpool.ThreadPool;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

public class NodeConnectionsService
extends AbstractLifecycleComponent {
    private static final Logger logger = LogManager.getLogger(NodeConnectionsService.class);
    public static final Setting<TimeValue> CLUSTER_NODE_RECONNECT_INTERVAL_SETTING = Setting.positiveTimeSetting("cluster.nodes.reconnect_interval", TimeValue.timeValueSeconds(10L), Setting.Property.NodeScope);
    private final ThreadPool threadPool;
    private final TransportService transportService;
    private ConcurrentMap<DiscoveryNode, Integer> nodes = ConcurrentCollections.newConcurrentMap();
    private final KeyedLock<DiscoveryNode> nodeLocks = new KeyedLock();
    private final TimeValue reconnectInterval;
    private volatile Scheduler.Cancellable backgroundCancellable = null;

    @Inject
    public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.reconnectInterval = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
    }

    public void connectToNodes(DiscoveryNodes discoveryNodes) {
        final CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize());
        for (final DiscoveryNode node : discoveryNodes) {
            boolean connected;
            try (Releasable ignored = this.nodeLocks.acquire(node);){
                this.nodes.putIfAbsent(node, 0);
                connected = this.transportService.nodeConnected(node);
            }
            if (connected) {
                latch.countDown();
                continue;
            }
            this.threadPool.executor("management").execute(new AbstractRunnable(){

                @Override
                public void onFailure(Exception e) {
                    logger.warn(() -> new ParameterizedMessage("failed to connect to {}", (Object)node), (Throwable)e);
                }

                @Override
                protected void doRun() {
                    try (Releasable ignored = NodeConnectionsService.this.nodeLocks.acquire(node);){
                        NodeConnectionsService.this.validateAndConnectIfNeeded(node);
                    }
                }

                @Override
                public void onAfter() {
                    latch.countDown();
                }
            });
        }
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
        HashSet currentNodes = new HashSet(this.nodes.keySet());
        for (DiscoveryNode node : nodesToKeep) {
            currentNodes.remove(node);
        }
        for (DiscoveryNode node : currentNodes) {
            Releasable ignored = this.nodeLocks.acquire(node);
            try {
                Integer current = (Integer)this.nodes.remove(node);
                assert (current != null) : "node " + node + " was removed in event but not in internal nodes";
                try {
                    this.transportService.disconnectFromNode(node);
                }
                catch (Exception e) {
                    logger.warn(() -> new ParameterizedMessage("failed to disconnect to node [{}]", (Object)node), (Throwable)e);
                }
            }
            finally {
                if (ignored == null) continue;
                ignored.close();
            }
        }
    }

    void validateAndConnectIfNeeded(DiscoveryNode node) {
        assert (this.nodeLocks.isHeldByCurrentThread(node)) : "validateAndConnectIfNeeded must be called under lock";
        if (!this.lifecycle.stoppedOrClosed() && this.nodes.containsKey(node)) {
            try {
                this.transportService.connectToNode(node);
                this.nodes.put(node, 0);
            }
            catch (Exception e) {
                Integer nodeFailureCount = (Integer)this.nodes.get(node);
                assert (nodeFailureCount != null) : node + " didn't have a counter in nodes map";
                if ((nodeFailureCount = Integer.valueOf(nodeFailureCount + 1)) % 6 == 1) {
                    int finalNodeFailureCount = nodeFailureCount;
                    logger.warn(() -> new ParameterizedMessage("failed to connect to node {} (tried [{}] times)", (Object)node, (Object)finalNodeFailureCount), (Throwable)e);
                }
                this.nodes.put(node, nodeFailureCount);
            }
        }
    }

    @Override
    protected void doStart() {
        this.backgroundCancellable = this.threadPool.schedule(new ConnectionChecker(), this.reconnectInterval, "generic");
    }

    @Override
    protected void doStop() {
        if (this.backgroundCancellable != null) {
            this.backgroundCancellable.cancel();
        }
    }

    @Override
    protected void doClose() {
    }

    class ConnectionChecker
    extends AbstractRunnable {
        ConnectionChecker() {
        }

        @Override
        public void onFailure(Exception e) {
            logger.warn("unexpected error while checking for node reconnects", (Throwable)e);
        }

        @Override
        protected void doRun() {
            for (DiscoveryNode node : NodeConnectionsService.this.nodes.keySet()) {
                Releasable ignored = NodeConnectionsService.this.nodeLocks.acquire(node);
                try {
                    NodeConnectionsService.this.validateAndConnectIfNeeded(node);
                }
                finally {
                    if (ignored == null) continue;
                    ignored.close();
                }
            }
        }

        @Override
        public void onAfter() {
            if (NodeConnectionsService.this.lifecycle.started()) {
                NodeConnectionsService.this.backgroundCancellable = NodeConnectionsService.this.threadPool.schedule(this, NodeConnectionsService.this.reconnectInterval, "generic");
            }
        }
    }
}

