/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch7.shaded.org.elasticsearch.discovery.zen;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.LogManager;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.Logger;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterName;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterState;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.node.DiscoveryNode;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.io.stream.StreamInput;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.io.stream.StreamOutput;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.settings.Settings;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.discovery.zen.FaultDetection;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.tasks.Task;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.threadpool.ThreadPool;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.ConnectTransportException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportChannel;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportRequestHandler;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportRequestOptions;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportResponseHandler;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportService;

public class NodesFaultDetection
extends FaultDetection {
    private static final Logger logger = LogManager.getLogger(NodesFaultDetection.class);
    public static final String PING_ACTION_NAME = "internal:discovery/zen/fd/ping";
    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList();
    private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = ConcurrentCollections.newConcurrentMap();
    private final Supplier<ClusterState> clusterStateSupplier;
    private volatile DiscoveryNode localNode;

    public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, Supplier<ClusterState> clusterStateSupplier, ClusterName clusterName) {
        super(settings, threadPool, transportService, clusterName);
        this.clusterStateSupplier = clusterStateSupplier;
        logger.debug("[node  ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", (Object)this.pingInterval, (Object)this.pingRetryTimeout, (Object)this.pingRetryCount);
        transportService.registerRequestHandler(PING_ACTION_NAME, PingRequest::new, "same", false, false, new PingRequestHandler());
    }

    public void setLocalNode(DiscoveryNode localNode) {
        this.localNode = localNode;
    }

    public void addListener(Listener listener) {
        this.listeners.add(listener);
    }

    public void removeListener(Listener listener) {
        this.listeners.remove(listener);
    }

    public Set<DiscoveryNode> getNodes() {
        return Collections.unmodifiableSet(this.nodesFD.keySet());
    }

    public void updateNodesAndPing(ClusterState clusterState) {
        for (DiscoveryNode monitoredNode : this.nodesFD.keySet()) {
            if (clusterState.nodes().nodeExists(monitoredNode)) continue;
            this.nodesFD.remove(monitoredNode);
        }
        for (DiscoveryNode node : clusterState.nodes()) {
            if (node.equals(this.localNode) || this.nodesFD.containsKey(node)) continue;
            NodeFD fd = new NodeFD(node);
            this.nodesFD.put(node, fd);
            this.threadPool.schedule(fd, TimeValue.timeValueMillis(0L), "same");
        }
    }

    public NodesFaultDetection stop() {
        this.nodesFD.clear();
        return this;
    }

    @Override
    public void close() {
        super.close();
        this.stop();
    }

    @Override
    protected void handleTransportDisconnect(DiscoveryNode node) {
        NodeFD nodeFD = (NodeFD)this.nodesFD.remove(node);
        if (nodeFD == null) {
            return;
        }
        if (this.connectOnNetworkDisconnect) {
            NodeFD fd = new NodeFD(node);
            try {
                this.transportService.connectToNode(node);
                this.nodesFD.put(node, fd);
                this.threadPool.schedule(fd, TimeValue.timeValueMillis(0L), "same");
            }
            catch (Exception e) {
                logger.trace("[node  ] [{}] transport disconnected (with verified connect)", (Object)node);
                this.nodesFD.remove(node, fd);
                this.notifyNodeFailure(node, "transport disconnected (with verified connect)");
            }
        } else {
            logger.trace("[node  ] [{}] transport disconnected", (Object)node);
            this.notifyNodeFailure(node, "transport disconnected");
        }
    }

    private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
        try {
            this.threadPool.generic().execute(new Runnable(){

                @Override
                public void run() {
                    for (Listener listener : NodesFaultDetection.this.listeners) {
                        listener.onNodeFailure(node, reason);
                    }
                }
            });
        }
        catch (EsRejectedExecutionException ex) {
            logger.trace(() -> new ParameterizedMessage("[node  ] [{}] ignoring node failure (reason [{}]). Local node is shutting down", (Object)node, (Object)reason), (Throwable)ex);
        }
    }

    private void notifyPingReceived(final PingRequest pingRequest) {
        this.threadPool.generic().execute(new Runnable(){

            @Override
            public void run() {
                for (Listener listener : NodesFaultDetection.this.listeners) {
                    listener.onPingReceived(pingRequest);
                }
            }
        });
    }

    public static class PingResponse
    extends TransportResponse {
        public PingResponse() {
        }

        public PingResponse(StreamInput in) throws IOException {
            super(in);
        }
    }

    public static class PingRequest
    extends TransportRequest {
        private DiscoveryNode targetNode;
        private ClusterName clusterName;
        private DiscoveryNode masterNode;
        private long clusterStateVersion = -1L;

        public PingRequest() {
        }

        public PingRequest(DiscoveryNode targetNode, ClusterName clusterName, DiscoveryNode masterNode, long clusterStateVersion) {
            this.targetNode = targetNode;
            this.clusterName = clusterName;
            this.masterNode = masterNode;
            this.clusterStateVersion = clusterStateVersion;
        }

        public DiscoveryNode targetNode() {
            return this.targetNode;
        }

        public ClusterName clusterName() {
            return this.clusterName;
        }

        public DiscoveryNode masterNode() {
            return this.masterNode;
        }

        public long clusterStateVersion() {
            return this.clusterStateVersion;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.targetNode = new DiscoveryNode(in);
            this.clusterName = new ClusterName(in);
            this.masterNode = new DiscoveryNode(in);
            this.clusterStateVersion = in.readLong();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            this.targetNode.writeTo(out);
            this.clusterName.writeTo(out);
            this.masterNode.writeTo(out);
            out.writeLong(this.clusterStateVersion);
        }
    }

    class PingRequestHandler
    implements TransportRequestHandler<PingRequest> {
        PingRequestHandler() {
        }

        @Override
        public void messageReceived(PingRequest request, TransportChannel channel, Task task) throws Exception {
            if (!NodesFaultDetection.this.localNode.equals(request.targetNode())) {
                throw new IllegalStateException("Got pinged as node " + request.targetNode() + "], but I am node " + NodesFaultDetection.this.localNode);
            }
            if (request.clusterName != null && !request.clusterName.equals(NodesFaultDetection.this.clusterName)) {
                throw new IllegalStateException("Got pinged with cluster name [" + request.clusterName + "], but I'm part of cluster [" + NodesFaultDetection.this.clusterName + "]");
            }
            NodesFaultDetection.this.notifyPingReceived(request);
            channel.sendResponse(new PingResponse());
        }
    }

    private class NodeFD
    implements Runnable {
        volatile int retryCount;
        private final DiscoveryNode node;

        private NodeFD(DiscoveryNode node) {
            this.node = node;
        }

        private boolean running() {
            return this.equals(NodesFaultDetection.this.nodesFD.get(this.node));
        }

        private PingRequest newPingRequest() {
            return new PingRequest(this.node, NodesFaultDetection.this.clusterName, NodesFaultDetection.this.localNode, ((ClusterState)NodesFaultDetection.this.clusterStateSupplier.get()).version());
        }

        @Override
        public void run() {
            if (!this.running()) {
                return;
            }
            final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(NodesFaultDetection.this.pingRetryTimeout).build();
            NodesFaultDetection.this.transportService.sendRequest(this.node, NodesFaultDetection.PING_ACTION_NAME, (TransportRequest)this.newPingRequest(), options, new TransportResponseHandler<PingResponse>(){

                @Override
                public PingResponse read(StreamInput in) throws IOException {
                    return new PingResponse(in);
                }

                @Override
                public void handleResponse(PingResponse response) {
                    if (!NodeFD.this.running()) {
                        return;
                    }
                    NodeFD.this.retryCount = 0;
                    NodesFaultDetection.this.threadPool.schedule(NodeFD.this, NodesFaultDetection.this.pingInterval, "same");
                }

                @Override
                public void handleException(TransportException exp) {
                    if (!NodeFD.this.running()) {
                        return;
                    }
                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                        NodesFaultDetection.this.handleTransportDisconnect(NodeFD.this.node);
                        return;
                    }
                    ++NodeFD.this.retryCount;
                    logger.trace(() -> new ParameterizedMessage("[node  ] failed to ping [{}], retry [{}] out of [{}]", NodeFD.this.node, NodeFD.this.retryCount, NodesFaultDetection.this.pingRetryCount), (Throwable)exp);
                    if (NodeFD.this.retryCount >= NodesFaultDetection.this.pingRetryCount) {
                        logger.debug("[node  ] failed to ping [{}], tried [{}] times, each with  maximum [{}] timeout", (Object)NodeFD.this.node, (Object)NodesFaultDetection.this.pingRetryCount, (Object)NodesFaultDetection.this.pingRetryTimeout);
                        if (NodesFaultDetection.this.nodesFD.remove(NodeFD.this.node, NodeFD.this)) {
                            NodesFaultDetection.this.notifyNodeFailure(NodeFD.this.node, "failed to ping, tried [" + NodesFaultDetection.this.pingRetryCount + "] times, each with maximum [" + NodesFaultDetection.this.pingRetryTimeout + "] timeout");
                        }
                    } else {
                        NodesFaultDetection.this.transportService.sendRequest(NodeFD.this.node, NodesFaultDetection.PING_ACTION_NAME, (TransportRequest)NodeFD.this.newPingRequest(), options, this);
                    }
                }

                @Override
                public String executor() {
                    return "same";
                }
            });
        }
    }

    public static abstract class Listener {
        public void onNodeFailure(DiscoveryNode node, String reason) {
        }

        public void onPingReceived(PingRequest pingRequest) {
        }
    }
}

