/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.zen;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.ElasticsearchException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.Version;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.ActionListener;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.ClusterChangedEvent;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.ClusterState;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.Diff;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.node.DiscoveryNode;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.node.DiscoveryNodes;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.bytes.BytesReference;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.compress.Compressor;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.compress.CompressorFactory;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.io.stream.StreamInput;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.io.stream.StreamOutput;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.core.internal.io.IOUtils;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.Discovery;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.DiscoverySettings;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.BytesTransportRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportChannel;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportRequestHandler;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportRequestOptions;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

public class PublishClusterStateAction {
    private static final Logger logger = LogManager.getLogger(PublishClusterStateAction.class);
    public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send";
    public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit";
    private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
    private final TransportService transportService;
    private final NamedWriteableRegistry namedWriteableRegistry;
    private final IncomingClusterStateListener incomingClusterStateListener;
    private final DiscoverySettings discoverySettings;
    private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
    private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
    private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
    private Object lastSeenClusterStateMutex = new Object();
    private ClusterState lastSeenClusterState;

    public PublishClusterStateAction(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, IncomingClusterStateListener incomingClusterStateListener, DiscoverySettings discoverySettings) {
        this.transportService = transportService;
        this.namedWriteableRegistry = namedWriteableRegistry;
        this.incomingClusterStateListener = incomingClusterStateListener;
        this.discoverySettings = discoverySettings;
        transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest::new, "same", false, false, new SendClusterStateRequestHandler());
        transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, "same", false, false, new CommitClusterStateRequestHandler());
    }

    public void publish(ClusterChangedEvent clusterChangedEvent, int minMasterNodes, Discovery.AckListener ackListener) throws Discovery.FailedToCommitClusterStateException {
        SendingController sendingController;
        HashMap<Version, BytesReference> serializedDiffs;
        HashMap<Version, BytesReference> serializedStates;
        boolean sendFullVersion;
        HashSet<DiscoveryNode> nodesToPublishTo;
        try {
            DiscoveryNodes nodes = clusterChangedEvent.state().nodes();
            nodesToPublishTo = new HashSet<DiscoveryNode>(nodes.getSize());
            DiscoveryNode localNode = nodes.getLocalNode();
            int totalMasterNodes = nodes.getMasterNodes().size();
            for (DiscoveryNode node : nodes) {
                if (node.equals(localNode)) continue;
                nodesToPublishTo.add(node);
            }
            sendFullVersion = !this.discoverySettings.getPublishDiff() || clusterChangedEvent.previousState() == null;
            serializedStates = new HashMap<Version, BytesReference>();
            serializedDiffs = new HashMap<Version, BytesReference>();
            this.buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(), nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);
            AckClusterStatePublishResponseHandler publishResponseHandler = new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener);
            sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes, totalMasterNodes, publishResponseHandler);
        }
        catch (Exception e) {
            throw new Discovery.FailedToCommitClusterStateException("unexpected error while preparing to publish", (Throwable)e, new Object[0]);
        }
        try {
            this.innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates, serializedDiffs);
        }
        catch (Discovery.FailedToCommitClusterStateException t) {
            throw t;
        }
        catch (Exception e) {
            if (sendingController.markAsFailed("unexpected error", e)) {
                throw new Discovery.FailedToCommitClusterStateException("unexpected error", (Throwable)e, new Object[0]);
            }
            throw e;
        }
    }

    private void innerPublish(ClusterChangedEvent clusterChangedEvent, Set<DiscoveryNode> nodesToPublishTo, SendingController sendingController, Discovery.AckListener ackListener, boolean sendFullVersion, Map<Version, BytesReference> serializedStates, Map<Version, BytesReference> serializedDiffs) {
        ClusterState clusterState = clusterChangedEvent.state();
        ClusterState previousState = clusterChangedEvent.previousState();
        TimeValue publishTimeout = this.discoverySettings.getPublishTimeout();
        long publishingStartInNanos = System.nanoTime();
        for (DiscoveryNode node : nodesToPublishTo) {
            if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
                this.sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
                continue;
            }
            this.sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);
        }
        sendingController.waitForCommit(this.discoverySettings.getCommitTimeout());
        long commitTime = System.nanoTime() - publishingStartInNanos;
        ackListener.onCommit(TimeValue.timeValueNanos(commitTime));
        try {
            Set<DiscoveryNode> failedNodes;
            DiscoveryNode[] pendingNodes;
            long timeLeftInNanos = Math.max(0L, publishTimeout.nanos() - commitTime);
            BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
            sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
            if (sendingController.getPublishingTimedOut() && (pendingNodes = publishResponseHandler.pendingNodes()).length > 0) {
                logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})", (Object)clusterState.version(), (Object)publishTimeout, (Object)pendingNodes);
            }
            if (!(failedNodes = publishResponseHandler.getFailedNodes()).isEmpty()) {
                logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]", (Object)clusterChangedEvent.state().version(), failedNodes);
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo, boolean sendFullVersion, Map<Version, BytesReference> serializedStates, Map<Version, BytesReference> serializedDiffs) {
        Diff<ClusterState> diff = null;
        for (DiscoveryNode node : nodesToPublishTo) {
            try {
                if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
                    if (serializedStates.containsKey(node.getVersion())) continue;
                    serializedStates.put(node.getVersion(), PublishClusterStateAction.serializeFullClusterState(clusterState, node.getVersion()));
                    continue;
                }
                if (diff == null) {
                    diff = clusterState.diff(previousState);
                }
                if (serializedDiffs.containsKey(node.getVersion())) continue;
                serializedDiffs.put(node.getVersion(), PublishClusterStateAction.serializeDiffClusterState(diff, node.getVersion()));
            }
            catch (IOException e) {
                throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", (Throwable)e, node);
            }
        }
    }

    private void sendFullClusterState(ClusterState clusterState, Map<Version, BytesReference> serializedStates, DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) {
        BytesReference bytes = serializedStates.get(node.getVersion());
        if (bytes == null) {
            try {
                bytes = PublishClusterStateAction.serializeFullClusterState(clusterState, node.getVersion());
                serializedStates.put(node.getVersion(), bytes);
            }
            catch (Exception e) {
                logger.warn(() -> new ParameterizedMessage("failed to serialize cluster_state before publishing it to node {}", (Object)node), (Throwable)e);
                sendingController.onNodeSendFailed(node, e);
                return;
            }
        }
        this.sendClusterStateToNode(clusterState, bytes, node, publishTimeout, sendingController, false, serializedStates);
    }

    private void sendClusterStateDiff(ClusterState clusterState, Map<Version, BytesReference> serializedDiffs, Map<Version, BytesReference> serializedStates, DiscoveryNode node, TimeValue publishTimeout, SendingController sendingController) {
        BytesReference bytes = serializedDiffs.get(node.getVersion());
        assert (bytes != null) : "failed to find serialized diff for node " + node + " of version [" + node.getVersion() + "]";
        this.sendClusterStateToNode(clusterState, bytes, node, publishTimeout, sendingController, true, serializedStates);
    }

    private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes, final DiscoveryNode node, final TimeValue publishTimeout, final SendingController sendingController, final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
        try {
            this.transportService.sendRequest(node, SEND_ACTION_NAME, (TransportRequest)new BytesTransportRequest(bytes, node.getVersion()), this.stateRequestOptions, new EmptyTransportResponseHandler("same"){

                @Override
                public void handleResponse(TransportResponse.Empty response) {
                    if (sendingController.getPublishingTimedOut()) {
                        logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", (Object)node, (Object)clusterState.version(), (Object)publishTimeout);
                    }
                    sendingController.onNodeSendAck(node);
                }

                @Override
                public void handleException(TransportException exp) {
                    if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
                        logger.debug("resending full cluster state to node {} reason {}", (Object)node, (Object)exp.getDetailedMessage());
                        PublishClusterStateAction.this.sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
                    } else {
                        logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", (Object)node), (Throwable)exp);
                        sendingController.onNodeSendFailed(node, exp);
                    }
                }
            });
        }
        catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", (Object)node), (Throwable)e);
            sendingController.onNodeSendFailed(node, e);
        }
    }

    private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
        try {
            logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", (Object)clusterState.stateUUID(), (Object)clusterState.version(), (Object)node);
            this.transportService.sendRequest(node, COMMIT_ACTION_NAME, (TransportRequest)new CommitClusterStateRequest(clusterState.stateUUID()), this.stateRequestOptions, new EmptyTransportResponseHandler("same"){

                @Override
                public void handleResponse(TransportResponse.Empty response) {
                    if (sendingController.getPublishingTimedOut()) {
                        logger.debug("node {} responded to cluster state commit [{}]", (Object)node, (Object)clusterState.version());
                    }
                    sendingController.getPublishResponseHandler().onResponse(node);
                }

                @Override
                public void handleException(TransportException exp) {
                    logger.debug(() -> new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}", new Object[]{clusterState.stateUUID(), clusterState.version(), node}), (Throwable)exp);
                    sendingController.getPublishResponseHandler().onFailure(node, exp);
                }
            });
        }
        catch (Exception t) {
            logger.warn(() -> new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}", new Object[]{clusterState.stateUUID(), clusterState.version(), node}), (Throwable)t);
            sendingController.getPublishResponseHandler().onFailure(node, t);
        }
    }

    public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
        BytesStreamOutput bStream = new BytesStreamOutput();
        try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream);){
            stream.setVersion(nodeVersion);
            stream.writeBoolean(true);
            clusterState.writeTo(stream);
        }
        return bStream.bytes();
    }

    public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
        BytesStreamOutput bStream = new BytesStreamOutput();
        try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream);){
            stream.setVersion(nodeVersion);
            stream.writeBoolean(false);
            diff.writeTo(stream);
        }
        return bStream.bytes();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
        Compressor compressor = CompressorFactory.compressor(request.bytes());
        StreamInput in = request.bytes().streamInput();
        Object object = this.lastSeenClusterStateMutex;
        synchronized (object) {
            ClusterState incomingState;
            try {
                if (compressor != null) {
                    in = compressor.streamInput(in);
                }
                in = new NamedWriteableAwareStreamInput(in, this.namedWriteableRegistry);
                in.setVersion(request.version());
                if (in.readBoolean()) {
                    incomingState = ClusterState.readFrom(in, this.transportService.getLocalNode());
                    this.fullClusterStateReceivedCount.incrementAndGet();
                    logger.debug("received full cluster state version [{}] with size [{}]", (Object)incomingState.version(), (Object)request.bytes().length());
                } else {
                    if (this.lastSeenClusterState == null) {
                        logger.debug("received diff for but don't have any local cluster state - requesting full state");
                        throw new IncompatibleClusterStateVersionException("have no local cluster state");
                    }
                    Diff<ClusterState> diff = ClusterState.readDiffFrom(in, this.lastSeenClusterState.nodes().getLocalNode());
                    incomingState = diff.apply(this.lastSeenClusterState);
                    this.compatibleClusterStateDiffReceivedCount.incrementAndGet();
                    logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", (Object)incomingState.version(), (Object)incomingState.stateUUID(), (Object)request.bytes().length());
                }
            }
            catch (IncompatibleClusterStateVersionException e) {
                try {
                    this.incompatibleClusterStateDiffReceivedCount.incrementAndGet();
                    throw e;
                    catch (Exception e2) {
                        logger.warn("unexpected error while deserializing an incoming cluster state", (Throwable)e2);
                        throw e2;
                    }
                }
                catch (Throwable throwable) {
                    IOUtils.close(in);
                    throw throwable;
                }
            }
            IOUtils.close(in);
            this.incomingClusterStateListener.onIncomingClusterState(incomingState);
            this.lastSeenClusterState = incomingState;
        }
        channel.sendResponse(TransportResponse.Empty.INSTANCE);
    }

    protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
        this.incomingClusterStateListener.onClusterStateCommitted(request.stateUUID, new ActionListener<Void>(){

            @Override
            public void onResponse(Void ignore) {
                try {
                    channel.sendResponse(TransportResponse.Empty.INSTANCE);
                }
                catch (Exception e) {
                    logger.debug("failed to send response on cluster state processed", (Throwable)e);
                    this.onFailure(e);
                }
            }

            @Override
            public void onFailure(Exception e) {
                try {
                    channel.sendResponse(e);
                }
                catch (Exception inner) {
                    inner.addSuppressed(e);
                    logger.debug("failed to send response on cluster state processed", (Throwable)inner);
                }
            }
        });
    }

    public PublishClusterStateStats stats() {
        return new PublishClusterStateStats(this.fullClusterStateReceivedCount.get(), this.incompatibleClusterStateDiffReceivedCount.get(), this.compatibleClusterStateDiffReceivedCount.get());
    }

    class SendingController {
        private final ClusterState clusterState;
        private final BlockingClusterStatePublishResponseHandler publishResponseHandler;
        final ArrayList<DiscoveryNode> sendAckedBeforeCommit = new ArrayList();
        final CountDownLatch committedOrFailedLatch;
        boolean committed;
        int neededMastersToCommit;
        int pendingMasterNodes;
        final AtomicBoolean publishingTimedOut = new AtomicBoolean();

        public BlockingClusterStatePublishResponseHandler getPublishResponseHandler() {
            return this.publishResponseHandler;
        }

        private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) {
            this.clusterState = clusterState;
            this.publishResponseHandler = publishResponseHandler;
            this.neededMastersToCommit = Math.max(0, minMasterNodes - 1);
            this.pendingMasterNodes = totalMasterNodes - 1;
            if (this.neededMastersToCommit > this.pendingMasterNodes) {
                throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state.[{}] needed , have [{}]", this.neededMastersToCommit, this.pendingMasterNodes);
            }
            this.committed = this.neededMastersToCommit == 0;
            this.committedOrFailedLatch = new CountDownLatch(this.committed ? 0 : 1);
        }

        public void waitForCommit(TimeValue commitTimeout) {
            boolean timedout = false;
            try {
                timedout = !this.committedOrFailedLatch.await(commitTimeout.millis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (timedout) {
                this.markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "])");
            }
            if (!this.isCommitted()) {
                throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left", timedout ? "timed out while waiting for" : "failed to get", this.neededMastersToCommit);
            }
        }

        public synchronized boolean isCommitted() {
            return this.committed;
        }

        public synchronized void onNodeSendAck(DiscoveryNode node) {
            if (this.committed) {
                assert (this.sendAckedBeforeCommit.isEmpty());
                PublishClusterStateAction.this.sendCommitToNode(node, this.clusterState, this);
            } else if (this.committedOrFailed()) {
                logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", (Object)node, (Object)this.clusterState.version());
            } else {
                this.sendAckedBeforeCommit.add(node);
                if (node.isMasterNode()) {
                    this.checkForCommitOrFailIfNoPending(node);
                }
            }
        }

        private synchronized boolean committedOrFailed() {
            return this.committedOrFailedLatch.getCount() == 0L;
        }

        private synchronized void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) {
            logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])", (Object)masterNode, (Object)this.clusterState.version(), (Object)this.pendingMasterNodes, (Object)this.neededMastersToCommit);
            --this.neededMastersToCommit;
            if (this.neededMastersToCommit == 0 && this.markAsCommitted()) {
                for (DiscoveryNode nodeToCommit : this.sendAckedBeforeCommit) {
                    PublishClusterStateAction.this.sendCommitToNode(nodeToCommit, this.clusterState, this);
                }
                this.sendAckedBeforeCommit.clear();
            }
            this.decrementPendingMasterAcksAndChangeForFailure();
        }

        private synchronized void decrementPendingMasterAcksAndChangeForFailure() {
            --this.pendingMasterNodes;
            if (this.pendingMasterNodes == 0 && this.neededMastersToCommit > 0) {
                this.markAsFailed("no more pending master nodes, but failed to reach needed acks ([" + this.neededMastersToCommit + "] left)");
            }
        }

        public synchronized void onNodeSendFailed(DiscoveryNode node, Exception e) {
            if (node.isMasterNode()) {
                logger.trace("master node {} failed to ack cluster state version [{}]. processing ... (current pending [{}], needed [{}])", (Object)node, (Object)this.clusterState.version(), (Object)this.pendingMasterNodes, (Object)this.neededMastersToCommit);
                this.decrementPendingMasterAcksAndChangeForFailure();
            }
            this.publishResponseHandler.onFailure(node, e);
        }

        private synchronized boolean markAsCommitted() {
            if (this.committedOrFailed()) {
                return this.committed;
            }
            logger.trace("committing version [{}]", (Object)this.clusterState.version());
            this.committed = true;
            this.committedOrFailedLatch.countDown();
            return true;
        }

        private synchronized boolean markAsFailed(String details, Exception reason) {
            if (this.committedOrFailed()) {
                return !this.committed;
            }
            logger.trace(() -> new ParameterizedMessage("failed to commit version [{}]. {}", (Object)this.clusterState.version(), (Object)details), (Throwable)reason);
            this.committed = false;
            this.committedOrFailedLatch.countDown();
            return true;
        }

        private synchronized boolean markAsFailed(String reason) {
            if (this.committedOrFailed()) {
                return !this.committed;
            }
            logger.trace("failed to commit version [{}]. {}", (Object)this.clusterState.version(), (Object)reason);
            this.committed = false;
            this.committedOrFailedLatch.countDown();
            return true;
        }

        public boolean getPublishingTimedOut() {
            return this.publishingTimedOut.get();
        }

        public void setPublishingTimedOut(boolean isTimedOut) {
            this.publishingTimedOut.set(isTimedOut);
        }
    }

    protected static class CommitClusterStateRequest
    extends TransportRequest {
        String stateUUID;

        public CommitClusterStateRequest() {
        }

        public CommitClusterStateRequest(String stateUUID) {
            this.stateUUID = stateUUID;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.stateUUID = in.readString();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(this.stateUUID);
        }
    }

    private class CommitClusterStateRequestHandler
    implements TransportRequestHandler<CommitClusterStateRequest> {
        private CommitClusterStateRequestHandler() {
        }

        @Override
        public void messageReceived(CommitClusterStateRequest request, TransportChannel channel) throws Exception {
            PublishClusterStateAction.this.handleCommitRequest(request, channel);
        }
    }

    private class SendClusterStateRequestHandler
    implements TransportRequestHandler<BytesTransportRequest> {
        private SendClusterStateRequestHandler() {
        }

        @Override
        public void messageReceived(BytesTransportRequest request, TransportChannel channel) throws Exception {
            PublishClusterStateAction.this.handleIncomingClusterStateRequest(request, channel);
        }
    }

    public static interface IncomingClusterStateListener {
        public void onIncomingClusterState(ClusterState var1);

        public void onClusterStateCommitted(String var1, ActionListener<Void> var2);
    }
}

