/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.Level;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.LogManager;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.Logger;
import org.apache.flink.elasticsearch7.shaded.org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.ActionListener;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterState;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.ClusterStateTaskListener;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.NotMasterException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.Coordinator;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.Join;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.JoinRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.JoinTaskExecutor;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.StartJoinRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.coordination.ValidateJoinRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.metadata.MetaData;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.node.DiscoveryNode;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.routing.RerouteService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.cluster.service.MasterService;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.Priority;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.collect.Tuple;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.io.stream.StreamInput;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.settings.Setting;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.settings.Settings;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.discovery.zen.MembershipAction;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.discovery.zen.ZenDiscovery;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportChannel;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportException;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportRequest;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportRequestOptions;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportResponse;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportResponseHandler;
import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.transport.TransportService;

public class JoinHelper {
    private static final Logger logger = LogManager.getLogger(JoinHelper.class);
    public static final String JOIN_ACTION_NAME = "internal:cluster/coordination/join";
    public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate";
    public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";
    public static final Setting<TimeValue> JOIN_TIMEOUT_SETTING = Setting.timeSetting("cluster.join.timeout", TimeValue.timeValueMillis(60000L), TimeValue.timeValueMillis(1L), Setting.Property.NodeScope);
    private final MasterService masterService;
    private final TransportService transportService;
    private final JoinTaskExecutor joinTaskExecutor;
    private final TimeValue joinTimeout;
    private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet());
    private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference();

    JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, TransportService transportService, final LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier, BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm, Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
        this.masterService = masterService;
        this.transportService = transportService;
        this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
        this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, rerouteService){

            @Override
            public ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks) throws Exception {
                long currentTerm = currentTermSupplier.getAsLong();
                if (currentState.term() != currentTerm) {
                    CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(currentState.coordinationMetaData()).term(currentTerm).build();
                    MetaData metaData = MetaData.builder(currentState.metaData()).coordinationMetaData(coordinationMetaData).build();
                    currentState = ClusterState.builder(currentState).metaData(metaData).build();
                }
                return super.execute(currentState, joiningTasks);
            }
        };
        transportService.registerRequestHandler(JOIN_ACTION_NAME, "generic", false, false, JoinRequest::new, (request, channel, task) -> joinHandler.accept((JoinRequest)request, this.transportJoinCallback(request, channel)));
        transportService.registerRequestHandler("internal:discovery/zen/join", MembershipAction.JoinRequest::new, "generic", false, false, (request, channel, task) -> joinHandler.accept(new JoinRequest(request.getNode(), Optional.empty()), this.transportJoinCallback(request, channel)));
        transportService.registerRequestHandler(START_JOIN_ACTION_NAME, "generic", false, false, StartJoinRequest::new, (request, channel, task) -> {
            DiscoveryNode destination = request.getSourceNode();
            this.sendJoinRequest(destination, Optional.of((Join)joinLeaderInTerm.apply((StartJoinRequest)request)));
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        });
        transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME, ValidateJoinRequest::new, "generic", (request, channel, task) -> {
            ClusterState localState = (ClusterState)currentStateSupplier.get();
            if (localState.metaData().clusterUUIDCommitted() && !localState.metaData().clusterUUID().equals(request.getState().metaData().clusterUUID())) {
                throw new CoordinationStateRejectedException("join validation on cluster state with a different cluster uuid " + request.getState().metaData().clusterUUID() + " than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting", new Object[0]);
            }
            joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        });
        transportService.registerRequestHandler("internal:discovery/zen/join/validate", ValidateJoinRequest::new, "generic", (request, channel, task) -> {
            ClusterState localState = (ClusterState)currentStateSupplier.get();
            if (localState.metaData().clusterUUIDCommitted() && !localState.metaData().clusterUUID().equals(request.getState().metaData().clusterUUID())) {
                throw new CoordinationStateRejectedException("mixed-version cluster join validation on cluster state with a different cluster uuid " + request.getState().metaData().clusterUUID() + " than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting", new Object[0]);
            }
            joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
            channel.sendResponse(TransportResponse.Empty.INSTANCE);
        });
        transportService.registerRequestHandler("internal:discovery/zen/rejoin", ZenDiscovery.RejoinClusterRequest::new, "same", (request, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE));
        transportService.registerRequestHandler("internal:discovery/zen/leave", MembershipAction.LeaveRequest::new, "same", (request, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE));
    }

    private JoinCallback transportJoinCallback(final TransportRequest request, final TransportChannel channel) {
        return new JoinCallback(){

            @Override
            public void onSuccess() {
                try {
                    channel.sendResponse(TransportResponse.Empty.INSTANCE);
                }
                catch (IOException e) {
                    this.onFailure(e);
                }
            }

            @Override
            public void onFailure(Exception e) {
                try {
                    channel.sendResponse(e);
                }
                catch (Exception inner) {
                    inner.addSuppressed(e);
                    logger.warn("failed to send back failure on join request", (Throwable)inner);
                }
            }

            public String toString() {
                return "JoinCallback{request=" + request + "}";
            }
        };
    }

    boolean isJoinPending() {
        return !this.pendingOutgoingJoins.isEmpty();
    }

    public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
        this.sendJoinRequest(destination, optionalJoin, () -> {});
    }

    void logLastFailedJoinAttempt() {
        FailedJoinAttempt attempt = this.lastFailedJoinAttempt.get();
        if (attempt != null) {
            attempt.logWarnWithTimestamp();
            this.lastFailedJoinAttempt.compareAndSet(attempt, null);
        }
    }

    public void sendJoinRequest(final DiscoveryNode destination, Optional<Join> optionalJoin, final Runnable onCompletion) {
        assert (destination.isMasterNode()) : "trying to join master-ineligible " + destination;
        final JoinRequest joinRequest = new JoinRequest(this.transportService.getLocalNode(), optionalJoin);
        final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
        if (this.pendingOutgoingJoins.add(dedupKey)) {
            TransportRequest transportRequest;
            String actionName;
            logger.debug("attempting to join {} with {}", (Object)destination, (Object)joinRequest);
            if (Coordinator.isZen1Node(destination)) {
                actionName = "internal:discovery/zen/join";
                transportRequest = new MembershipAction.JoinRequest(this.transportService.getLocalNode());
            } else {
                actionName = JOIN_ACTION_NAME;
                transportRequest = joinRequest;
            }
            this.transportService.sendRequest(destination, actionName, transportRequest, TransportRequestOptions.builder().withTimeout(this.joinTimeout).build(), new TransportResponseHandler<TransportResponse.Empty>(){

                @Override
                public TransportResponse.Empty read(StreamInput in) {
                    return TransportResponse.Empty.INSTANCE;
                }

                @Override
                public void handleResponse(TransportResponse.Empty response) {
                    JoinHelper.this.pendingOutgoingJoins.remove(dedupKey);
                    logger.debug("successfully joined {} with {}", (Object)destination, (Object)joinRequest);
                    JoinHelper.this.lastFailedJoinAttempt.set(null);
                    onCompletion.run();
                }

                @Override
                public void handleException(TransportException exp) {
                    JoinHelper.this.pendingOutgoingJoins.remove(dedupKey);
                    logger.info(() -> new ParameterizedMessage("failed to join {} with {}", (Object)destination, (Object)joinRequest), (Throwable)exp);
                    FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
                    attempt.logNow();
                    JoinHelper.this.lastFailedJoinAttempt.set(attempt);
                    onCompletion.run();
                }

                @Override
                public String executor() {
                    return "same";
                }
            });
        } else {
            logger.debug("already attempting to join {} with request {}, not sending request", (Object)destination, (Object)joinRequest);
        }
    }

    public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
        assert (startJoinRequest.getSourceNode().isMasterNode()) : "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
        this.transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler<TransportResponse.Empty>(){

            @Override
            public TransportResponse.Empty read(StreamInput in) {
                return TransportResponse.Empty.INSTANCE;
            }

            @Override
            public void handleResponse(TransportResponse.Empty response) {
                logger.debug("successful response to {} from {}", (Object)startJoinRequest, (Object)destination);
            }

            @Override
            public void handleException(TransportException exp) {
                logger.debug(new ParameterizedMessage("failure in response to {} from {}", (Object)startJoinRequest, (Object)destination), (Throwable)exp);
            }

            @Override
            public String executor() {
                return "same";
            }
        });
    }

    public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, final ActionListener<TransportResponse.Empty> listener) {
        String actionName = Coordinator.isZen1Node(node) ? "internal:discovery/zen/join/validate" : VALIDATE_JOIN_ACTION_NAME;
        this.transportService.sendRequest(node, actionName, (TransportRequest)new ValidateJoinRequest(state), TransportRequestOptions.builder().withTimeout(this.joinTimeout).build(), new EmptyTransportResponseHandler("generic"){

            @Override
            public void handleResponse(TransportResponse.Empty response) {
                listener.onResponse(response);
            }

            @Override
            public void handleException(TransportException exp) {
                listener.onFailure(exp);
            }
        });
    }

    class CandidateJoinAccumulator
    implements JoinAccumulator {
        private final Map<DiscoveryNode, JoinCallback> joinRequestAccumulator = new HashMap<DiscoveryNode, JoinCallback>();
        boolean closed;

        CandidateJoinAccumulator() {
        }

        @Override
        public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
            assert (!this.closed) : "CandidateJoinAccumulator closed";
            JoinCallback prev = this.joinRequestAccumulator.put(sender, joinCallback);
            if (prev != null) {
                prev.onFailure(new CoordinationStateRejectedException("received a newer join from " + sender, new Object[0]));
            }
        }

        @Override
        public void close(Coordinator.Mode newMode) {
            assert (!this.closed) : "CandidateJoinAccumulator closed";
            this.closed = true;
            if (newMode == Coordinator.Mode.LEADER) {
                LinkedHashMap<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new LinkedHashMap<JoinTaskExecutor.Task, ClusterStateTaskListener>();
                this.joinRequestAccumulator.forEach((key, value) -> {
                    JoinTaskExecutor.Task task = new JoinTaskExecutor.Task((DiscoveryNode)key, "elect leader");
                    pendingAsTasks.put(task, new JoinTaskListener(task, (JoinCallback)value));
                });
                String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)";
                pendingAsTasks.put(JoinTaskExecutor.newBecomeMasterTask(), (source, e) -> {});
                pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {});
                JoinHelper.this.masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), JoinHelper.this.joinTaskExecutor);
            } else {
                assert (newMode == Coordinator.Mode.FOLLOWER) : newMode;
                this.joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure(new CoordinationStateRejectedException("became follower", new Object[0])));
            }
        }

        public String toString() {
            return "CandidateJoinAccumulator{" + this.joinRequestAccumulator.keySet() + ", closed=" + this.closed + '}';
        }
    }

    static class FollowerJoinAccumulator
    implements JoinAccumulator {
        FollowerJoinAccumulator() {
        }

        @Override
        public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
            joinCallback.onFailure(new CoordinationStateRejectedException("join target is a follower", new Object[0]));
        }

        public String toString() {
            return "FollowerJoinAccumulator";
        }
    }

    static class InitialJoinAccumulator
    implements JoinAccumulator {
        InitialJoinAccumulator() {
        }

        @Override
        public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
            assert (false) : "unexpected join from " + sender + " during initialisation";
            joinCallback.onFailure(new CoordinationStateRejectedException("join target is not initialised yet", new Object[0]));
        }

        public String toString() {
            return "InitialJoinAccumulator";
        }
    }

    class LeaderJoinAccumulator
    implements JoinAccumulator {
        LeaderJoinAccumulator() {
        }

        @Override
        public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
            JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader");
            JoinHelper.this.masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), JoinHelper.this.joinTaskExecutor, new JoinTaskListener(task, joinCallback));
        }

        public String toString() {
            return "LeaderJoinAccumulator";
        }
    }

    static interface JoinAccumulator {
        public void handleJoinRequest(DiscoveryNode var1, JoinCallback var2);

        default public void close(Coordinator.Mode newMode) {
        }
    }

    static class JoinTaskListener
    implements ClusterStateTaskListener {
        private final JoinTaskExecutor.Task task;
        private final JoinCallback joinCallback;

        JoinTaskListener(JoinTaskExecutor.Task task, JoinCallback joinCallback) {
            this.task = task;
            this.joinCallback = joinCallback;
        }

        @Override
        public void onFailure(String source, Exception e) {
            this.joinCallback.onFailure(e);
        }

        @Override
        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
            this.joinCallback.onSuccess();
        }

        public String toString() {
            return "JoinTaskListener{task=" + this.task + "}";
        }
    }

    public static interface JoinCallback {
        public void onSuccess();

        public void onFailure(Exception var1);
    }

    static class FailedJoinAttempt {
        private final DiscoveryNode destination;
        private final JoinRequest joinRequest;
        private final TransportException exception;
        private final long timestamp;

        FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) {
            this.destination = destination;
            this.joinRequest = joinRequest;
            this.exception = exception;
            this.timestamp = System.nanoTime();
        }

        void logNow() {
            logger.log(FailedJoinAttempt.getLogLevel(this.exception), () -> new ParameterizedMessage("failed to join {} with {}", (Object)this.destination, (Object)this.joinRequest), (Throwable)this.exception);
        }

        static Level getLogLevel(TransportException e) {
            Throwable cause = e.unwrapCause();
            if (cause instanceof CoordinationStateRejectedException || cause instanceof FailedToCommitClusterStateException || cause instanceof NotMasterException) {
                return Level.DEBUG;
            }
            return Level.INFO;
        }

        void logWarnWithTimestamp() {
            logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ago, failed to join {} with {}", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - this.timestamp)), this.destination, this.joinRequest), (Throwable)this.exception);
        }
    }
}

