/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.zen;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.store.AlreadyClosedException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.Version;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.ClusterName;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.ClusterState;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.node.DiscoveryNode;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.cluster.node.DiscoveryNodes;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.component.AbstractComponent;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.io.stream.StreamInput;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.io.stream.StreamOutput;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.lease.Releasable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.lease.Releasables;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.settings.Setting;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.settings.Settings;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.transport.TransportAddress;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.TimeValue;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.CollectionUtils;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.EsExecutors;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.KeyedLock;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.core.internal.io.IOUtils;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.zen.PingContextProvider;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.discovery.zen.ZenPing;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.threadpool.ThreadPool;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.ConnectTransportException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.ConnectionProfile;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.NodeNotConnectedException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.RemoteTransportException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.Transport;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportChannel;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportRequestHandler;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportRequestOptions;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportResponseHandler;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.TransportService;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

public class UnicastZenPing
extends AbstractComponent
implements ZenPing {
    public static final String ACTION_NAME = "internal:discovery/zen/unicast";
    public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = Setting.listSetting("discovery.zen.ping.unicast.hosts", Collections.emptyList(), Function.identity(), Setting.Property.NodeScope);
    public static final Setting<Integer> DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Setting.Property.NodeScope);
    public static final Setting<TimeValue> DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5L), Setting.Property.NodeScope);
    public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
    public static final int LIMIT_LOCAL_PORTS_COUNT = 5;
    private final ThreadPool threadPool;
    private final TransportService transportService;
    private final ClusterName clusterName;
    private final List<String> configuredHosts;
    private final int limitPortCounts;
    private final PingContextProvider contextProvider;
    private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
    private static final String UNICAST_NODE_PREFIX = "#zen_unicast_";
    private final Map<Integer, PingingRound> activePingingRounds = ConcurrentCollections.newConcurrentMap();
    private final Queue<ZenPing.PingResponse> temporalResponses = ConcurrentCollections.newQueue();
    private final UnicastHostsProvider hostsProvider;
    protected final EsThreadPoolExecutor unicastZenPingExecutorService;
    private final TimeValue resolveTimeout;
    private volatile boolean closed = false;

    public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, UnicastHostsProvider unicastHostsProvider, PingContextProvider contextProvider) {
        super(settings);
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
        this.hostsProvider = unicastHostsProvider;
        this.contextProvider = contextProvider;
        int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
        if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
            this.configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
            this.limitPortCounts = 1;
        } else {
            this.configuredHosts = transportService.getLocalAddresses();
            this.limitPortCounts = 5;
        }
        this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
        this.logger.debug("using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]", this.configuredHosts, (Object)concurrentConnects, (Object)this.resolveTimeout);
        transportService.registerRequestHandler(ACTION_NAME, "same", UnicastPingRequest::new, new UnicastPingRequestHandler());
        ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
        this.unicastZenPingExecutorService = EsExecutors.newScaling(this.nodeName() + "/unicast_connect", 0, concurrentConnects, 60L, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
    }

    public static List<DiscoveryNode> resolveHostsLists(ExecutorService executorService, Logger logger, List<String> hosts, int limitPortCounts, TransportService transportService, String nodeId_prefix, TimeValue resolveTimeout) throws InterruptedException {
        Objects.requireNonNull(executorService);
        Objects.requireNonNull(logger);
        Objects.requireNonNull(hosts);
        Objects.requireNonNull(transportService);
        Objects.requireNonNull(nodeId_prefix);
        Objects.requireNonNull(resolveTimeout);
        if (resolveTimeout.nanos() < 0L) {
            throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
        }
        List callables = hosts.stream().map(hn -> () -> transportService.addressesFromString((String)hn, limitPortCounts)).collect(Collectors.toList());
        List futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
        ArrayList<DiscoveryNode> discoveryNodes = new ArrayList<DiscoveryNode>();
        HashSet<TransportAddress> localAddresses = new HashSet<TransportAddress>();
        localAddresses.add(transportService.boundAddress().publishAddress());
        localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
        Iterator<String> it = hosts.iterator();
        for (Future future : futures) {
            String hostname = it.next();
            if (!future.isCancelled()) {
                assert (future.isDone());
                try {
                    TransportAddress[] addresses = (TransportAddress[])future.get();
                    logger.trace("resolved host [{}] to {}", (Object)hostname, (Object)addresses);
                    for (int addressId = 0; addressId < addresses.length; ++addressId) {
                        TransportAddress address = addresses[addressId];
                        if (localAddresses.contains(address)) continue;
                        discoveryNodes.add(new DiscoveryNode(nodeId_prefix + hostname + "_" + addressId + "#", address, Collections.emptyMap(), Collections.emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
                    }
                    continue;
                }
                catch (ExecutionException e) {
                    assert (e.getCause() != null);
                    String message = "failed to resolve host [" + hostname + "]";
                    logger.warn(message, e.getCause());
                    continue;
                }
            }
            logger.warn("timed out after [{}] resolving host [{}]", (Object)resolveTimeout, (Object)hostname);
        }
        return discoveryNodes;
    }

    @Override
    public void close() {
        ThreadPool.terminate(this.unicastZenPingExecutorService, 10L, TimeUnit.SECONDS);
        Releasables.close(this.activePingingRounds.values());
        this.closed = true;
    }

    @Override
    public void start() {
    }

    public void clearTemporalResponses() {
        this.temporalResponses.clear();
    }

    @Override
    public void ping(Consumer<ZenPing.PingCollection> resultsConsumer, TimeValue duration) {
        this.ping(resultsConsumer, duration, duration);
    }

    protected void ping(Consumer<ZenPing.PingCollection> resultsConsumer, TimeValue scheduleDuration, final TimeValue requestDuration) {
        List<DiscoveryNode> seedNodes;
        try {
            seedNodes = UnicastZenPing.resolveHostsLists(this.unicastZenPingExecutorService, this.logger, this.configuredHosts, this.limitPortCounts, this.transportService, UNICAST_NODE_PREFIX, this.resolveTimeout);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        seedNodes.addAll(this.hostsProvider.buildDynamicNodes());
        DiscoveryNodes nodes = this.contextProvider.clusterState().nodes();
        for (ObjectCursor masterNode : nodes.getMasterNodes().values()) {
            seedNodes.add((DiscoveryNode)masterNode.value);
        }
        ConnectionProfile connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
        final PingingRound pingingRound = new PingingRound(this.pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer, nodes.getLocalNode(), connectionProfile);
        this.activePingingRounds.put(pingingRound.id(), pingingRound);
        AbstractRunnable pingSender = new AbstractRunnable(){

            @Override
            public void onFailure(Exception e) {
                if (!(e instanceof AlreadyClosedException)) {
                    UnicastZenPing.this.logger.warn("unexpected error while pinging", (Throwable)e);
                }
            }

            @Override
            protected void doRun() throws Exception {
                UnicastZenPing.this.sendPings(requestDuration, pingingRound);
            }
        };
        this.threadPool.generic().execute(pingSender);
        this.threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3L), "generic", pingSender);
        this.threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3L * 2L), "generic", pingSender);
        this.threadPool.schedule(scheduleDuration, "generic", new AbstractRunnable(){

            @Override
            protected void doRun() throws Exception {
                UnicastZenPing.this.finishPingingRound(pingingRound);
            }

            @Override
            public void onFailure(Exception e) {
                UnicastZenPing.this.logger.warn("unexpected error while finishing pinging round", (Throwable)e);
            }
        });
    }

    protected void finishPingingRound(PingingRound pingingRound) {
        pingingRound.close();
    }

    protected void sendPings(TimeValue timeout, PingingRound pingingRound) {
        ClusterState lastState = this.contextProvider.clusterState();
        UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, this.createPingResponse(lastState));
        Set nodesFromResponses = this.temporalResponses.stream().map(pingResponse -> {
            assert (this.clusterName.equals(pingResponse.clusterName())) : "got a ping request from a different cluster. expected " + this.clusterName + " got " + pingResponse.clusterName();
            return pingResponse.node();
        }).collect(Collectors.toSet());
        Map uniqueNodesByAddress = Stream.concat(pingingRound.getSeedNodes().stream(), nodesFromResponses.stream()).collect(Collectors.toMap(DiscoveryNode::getAddress, Function.identity(), (n1, n2) -> n1));
        Set<DiscoveryNode> nodesToPing = uniqueNodesByAddress.values().stream().map(node -> {
            DiscoveryNode foundNode = lastState.nodes().findByAddress(node.getAddress());
            if (foundNode == null) {
                return node;
            }
            return foundNode;
        }).collect(Collectors.toSet());
        nodesToPing.forEach(node -> this.sendPingRequestToNode((DiscoveryNode)node, timeout, pingingRound, pingRequest));
    }

    private void sendPingRequestToNode(final DiscoveryNode node, final TimeValue timeout, final PingingRound pingingRound, final UnicastPingRequest pingRequest) {
        this.submitToExecutor(new AbstractRunnable(){

            @Override
            protected void doRun() throws Exception {
                Transport.Connection connection = null;
                if (UnicastZenPing.this.transportService.nodeConnected(node)) {
                    try {
                        connection = UnicastZenPing.this.transportService.getConnection(node);
                    }
                    catch (NodeNotConnectedException e) {
                        UnicastZenPing.this.logger.trace("[{}] node [{}] just disconnected, will create a temp connection", (Object)pingingRound.id(), (Object)node);
                    }
                }
                if (connection == null) {
                    connection = pingingRound.getOrConnect(node);
                }
                UnicastZenPing.this.logger.trace("[{}] sending to {}", (Object)pingingRound.id(), (Object)node);
                UnicastZenPing.this.transportService.sendRequest(connection, UnicastZenPing.ACTION_NAME, (TransportRequest)pingRequest, TransportRequestOptions.builder().withTimeout((long)((double)timeout.millis() * 1.25)).build(), UnicastZenPing.this.getPingResponseHandler(pingingRound, node));
            }

            @Override
            public void onFailure(Exception e) {
                if (e instanceof ConnectTransportException || e instanceof AlreadyClosedException) {
                    UnicastZenPing.this.logger.trace(() -> new ParameterizedMessage("[{}] failed to ping {}", (Object)pingingRound.id(), (Object)node), (Throwable)e);
                } else if (e instanceof RemoteTransportException) {
                    UnicastZenPing.this.logger.debug(() -> new ParameterizedMessage("[{}] received a remote error as a response to ping {}", (Object)pingingRound.id(), (Object)node), (Throwable)e);
                } else {
                    UnicastZenPing.this.logger.warn(() -> new ParameterizedMessage("[{}] failed send ping to {}", (Object)pingingRound.id(), (Object)node), (Throwable)e);
                }
            }

            @Override
            public void onRejection(Exception e) {
                UnicastZenPing.this.logger.debug("Ping execution rejected", (Throwable)e);
            }
        });
    }

    protected void submitToExecutor(AbstractRunnable abstractRunnable) {
        this.unicastZenPingExecutorService.execute(abstractRunnable);
    }

    protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(final PingingRound pingingRound, final DiscoveryNode node) {
        return new TransportResponseHandler<UnicastPingResponse>(){

            @Override
            public UnicastPingResponse read(StreamInput in) throws IOException {
                return new UnicastPingResponse(in);
            }

            @Override
            public String executor() {
                return "same";
            }

            @Override
            public void handleResponse(UnicastPingResponse response) {
                UnicastZenPing.this.logger.trace("[{}] received response from {}: {}", (Object)pingingRound.id(), (Object)node, (Object)Arrays.toString(response.pingResponses));
                if (pingingRound.isClosed()) {
                    if (UnicastZenPing.this.logger.isTraceEnabled()) {
                        UnicastZenPing.this.logger.trace("[{}] skipping received response from {}. already closed", (Object)pingingRound.id(), (Object)node);
                    }
                } else {
                    Stream.of(response.pingResponses).forEach(pingingRound::addPingResponseToCollection);
                }
            }

            @Override
            public void handleException(TransportException exp) {
                if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException || exp.getCause() instanceof AlreadyClosedException) {
                    UnicastZenPing.this.logger.trace(() -> new ParameterizedMessage("failed to connect to {}", (Object)node), (Throwable)exp);
                } else if (!UnicastZenPing.this.closed) {
                    UnicastZenPing.this.logger.warn(() -> new ParameterizedMessage("failed to send ping to [{}]", (Object)node), (Throwable)exp);
                }
            }
        };
    }

    private UnicastPingResponse handlePingRequest(UnicastPingRequest request) {
        assert (this.clusterName.equals(request.pingResponse.clusterName())) : "got a ping request from a different cluster. expected " + this.clusterName + " got " + request.pingResponse.clusterName();
        this.temporalResponses.add(request.pingResponse);
        this.activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse));
        this.threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2L), "same", () -> this.temporalResponses.remove(request.pingResponse));
        ArrayList<ZenPing.PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(this.temporalResponses);
        pingResponses.add(this.createPingResponse(this.contextProvider.clusterState()));
        return new UnicastPingResponse(request.id, pingResponses.toArray(new ZenPing.PingResponse[pingResponses.size()]));
    }

    private ZenPing.PingResponse createPingResponse(ClusterState clusterState) {
        DiscoveryNodes discoNodes = clusterState.nodes();
        return new ZenPing.PingResponse(discoNodes.getLocalNode(), discoNodes.getMasterNode(), clusterState);
    }

    protected Version getVersion() {
        return Version.CURRENT;
    }

    static class UnicastPingResponse
    extends TransportResponse {
        final int id;
        final ZenPing.PingResponse[] pingResponses;

        UnicastPingResponse(int id, ZenPing.PingResponse[] pingResponses) {
            this.id = id;
            this.pingResponses = pingResponses;
        }

        UnicastPingResponse(StreamInput in) throws IOException {
            this.id = in.readInt();
            this.pingResponses = new ZenPing.PingResponse[in.readVInt()];
            for (int i = 0; i < this.pingResponses.length; ++i) {
                this.pingResponses[i] = new ZenPing.PingResponse(in);
            }
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeInt(this.id);
            out.writeVInt(this.pingResponses.length);
            for (ZenPing.PingResponse pingResponse : this.pingResponses) {
                pingResponse.writeTo(out);
            }
        }
    }

    static class UnicastPingRequest
    extends TransportRequest {
        final int id;
        final TimeValue timeout;
        final ZenPing.PingResponse pingResponse;

        UnicastPingRequest(int id, TimeValue timeout, ZenPing.PingResponse pingResponse) {
            this.id = id;
            this.timeout = timeout;
            this.pingResponse = pingResponse;
        }

        UnicastPingRequest(StreamInput in) throws IOException {
            super(in);
            this.id = in.readInt();
            this.timeout = in.readTimeValue();
            this.pingResponse = new ZenPing.PingResponse(in);
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeInt(this.id);
            out.writeTimeValue(this.timeout);
            this.pingResponse.writeTo(out);
        }
    }

    class UnicastPingRequestHandler
    implements TransportRequestHandler<UnicastPingRequest> {
        UnicastPingRequestHandler() {
        }

        @Override
        public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
            if (UnicastZenPing.this.closed) {
                throw new AlreadyClosedException("node is shutting down");
            }
            if (!request.pingResponse.clusterName().equals(UnicastZenPing.this.clusterName)) {
                throw new IllegalStateException(String.format(Locale.ROOT, "mismatched cluster names; request: [%s], local: [%s]", request.pingResponse.clusterName().value(), UnicastZenPing.this.clusterName.value()));
            }
            channel.sendResponse(UnicastZenPing.this.handlePingRequest(request));
        }
    }

    protected class PingingRound
    implements Releasable {
        private final int id;
        private final Map<TransportAddress, Transport.Connection> tempConnections = new HashMap<TransportAddress, Transport.Connection>();
        private final KeyedLock<TransportAddress> connectionLock = new KeyedLock(true);
        private final ZenPing.PingCollection pingCollection;
        private final List<DiscoveryNode> seedNodes;
        private final Consumer<ZenPing.PingCollection> pingListener;
        private final DiscoveryNode localNode;
        private final ConnectionProfile connectionProfile;
        private AtomicBoolean closed = new AtomicBoolean(false);

        PingingRound(int id, List<DiscoveryNode> seedNodes, Consumer<ZenPing.PingCollection> resultsConsumer, DiscoveryNode localNode, ConnectionProfile connectionProfile) {
            this.id = id;
            this.seedNodes = Collections.unmodifiableList(new ArrayList<DiscoveryNode>(seedNodes));
            this.pingListener = resultsConsumer;
            this.localNode = localNode;
            this.connectionProfile = connectionProfile;
            this.pingCollection = new ZenPing.PingCollection();
        }

        public int id() {
            return this.id;
        }

        public boolean isClosed() {
            return this.closed.get();
        }

        public List<DiscoveryNode> getSeedNodes() {
            this.ensureOpen();
            return this.seedNodes;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public Transport.Connection getOrConnect(DiscoveryNode node) throws IOException {
            try (Releasable ignore = this.connectionLock.acquire(node.getAddress());){
                Transport.Connection result = this.tempConnections.get(node.getAddress());
                if (result != null) return result;
                this.ensureOpen();
                boolean success = false;
                UnicastZenPing.this.logger.trace("[{}] opening connection to [{}]", (Object)this.id(), (Object)node);
                result = UnicastZenPing.this.transportService.openConnection(node, this.connectionProfile);
                try {
                    UnicastZenPing.this.transportService.handshake(result, this.connectionProfile.getHandshakeTimeout().millis());
                    PingingRound pingingRound = this;
                    synchronized (pingingRound) {
                        this.ensureOpen();
                        Transport.Connection existing = this.tempConnections.put(node.getAddress(), result);
                        assert (existing == null);
                        success = true;
                    }
                    if (success) return result;
                }
                catch (Throwable throwable) {
                    if (success) throw throwable;
                    UnicastZenPing.this.logger.trace("[{}] closing connection to [{}] due to failure", (Object)this.id(), (Object)node);
                    IOUtils.closeWhileHandlingException(result);
                    throw throwable;
                }
                UnicastZenPing.this.logger.trace("[{}] closing connection to [{}] due to failure", (Object)this.id(), (Object)node);
                IOUtils.closeWhileHandlingException(result);
                return result;
            }
        }

        private void ensureOpen() {
            if (this.isClosed()) {
                throw new AlreadyClosedException("pinging round [" + this.id + "] is finished");
            }
        }

        public void addPingResponseToCollection(ZenPing.PingResponse pingResponse) {
            if (!this.localNode.equals(pingResponse.node())) {
                this.pingCollection.addPing(pingResponse);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            ArrayList<Transport.Connection> toClose = null;
            PingingRound pingingRound = this;
            synchronized (pingingRound) {
                if (this.closed.compareAndSet(false, true)) {
                    UnicastZenPing.this.activePingingRounds.remove(this.id);
                    toClose = new ArrayList<Transport.Connection>(this.tempConnections.values());
                    this.tempConnections.clear();
                }
            }
            if (toClose != null) {
                try {
                    this.pingListener.accept(this.pingCollection);
                }
                finally {
                    IOUtils.closeWhileHandlingException(toClose);
                }
            }
        }

        public ConnectionProfile getConnectionProfile() {
            return this.connectionProfile;
        }
    }
}

