package org.apache.nifi.remote.client.socket;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.remote.util.NiFiRestApiUtil;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

/* loaded from: input_file:org/apache/nifi/remote/client/socket/EndpointConnectionPool.class */
public class EndpointConnectionPool {
    public static final long PEER_REFRESH_PERIOD = 60000;
    public static final String CATEGORY = "Site-to-Site";
    public static final long REMOTE_REFRESH_MILLIS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
    private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
    private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap;
    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations;
    private final URI clusterUrl;
    private final String apiUri;
    private final AtomicLong peerIndex;
    private final ReentrantLock peerRefreshLock;
    private volatile List<PeerStatus> peerStatuses;
    private volatile long peerRefreshTime;
    private volatile PeerStatusCache peerStatusCache;
    private final Set<EndpointConnection> activeConnections;
    private final File peersFile;
    private final EventReporter eventReporter;
    private final SSLContext sslContext;
    private final ScheduledExecutorService taskExecutor;
    private final int idleExpirationMillis;
    private final RemoteDestination remoteDestination;
    private final ReadWriteLock listeningPortRWLock;
    private final Lock remoteInfoReadLock;
    private final Lock remoteInfoWriteLock;
    private Integer siteToSitePort;
    private Boolean siteToSiteSecure;
    private long remoteRefreshTime;
    private final Map<String, String> inputPortMap;
    private final Map<String, String> outputPortMap;
    private volatile int commsTimeout;
    private volatile boolean shutdown;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/remote/client/socket/EndpointConnectionPool$IdEnrichedRemoteDestination.class */
    public class IdEnrichedRemoteDestination implements RemoteDestination {
        private final RemoteDestination original;
        private final String identifier;

        public IdEnrichedRemoteDestination(RemoteDestination remoteDestination, String str) {
            this.original = remoteDestination;
            this.identifier = str;
        }

        public String getIdentifier() {
            return this.identifier;
        }

        public String getName() {
            return this.original.getName();
        }

        public long getYieldPeriod(TimeUnit timeUnit) {
            return this.original.getYieldPeriod(timeUnit);
        }

        public boolean isUseCompression() {
            return this.original.isUseCompression();
        }
    }

    public EndpointConnectionPool(String str, RemoteDestination remoteDestination, int i, int i2, EventReporter eventReporter, File file) {
        this(str, remoteDestination, i, i2, null, eventReporter, file);
    }

    public EndpointConnectionPool(String str, RemoteDestination remoteDestination, int i, int i2, SSLContext sSLContext, EventReporter eventReporter, File file) {
        this.connectionQueueMap = new ConcurrentHashMap();
        this.peerTimeoutExpirations = new ConcurrentHashMap();
        this.peerIndex = new AtomicLong(0L);
        this.peerRefreshLock = new ReentrantLock();
        this.peerRefreshTime = 0L;
        this.activeConnections = Collections.synchronizedSet(new HashSet());
        this.listeningPortRWLock = new ReentrantReadWriteLock();
        this.remoteInfoReadLock = this.listeningPortRWLock.readLock();
        this.remoteInfoWriteLock = this.listeningPortRWLock.writeLock();
        this.inputPortMap = new HashMap();
        this.outputPortMap = new HashMap();
        this.shutdown = false;
        Objects.requireNonNull(str, "URL cannot be null");
        Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
        try {
            this.clusterUrl = new URI(str);
            String path = this.clusterUrl.getPath();
            this.apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + (path.endsWith("/") ? path.substring(0, path.length() - 1) : path) + "-api";
            this.remoteDestination = remoteDestination;
            this.sslContext = sSLContext;
            this.peersFile = file;
            this.eventReporter = eventReporter;
            this.commsTimeout = i;
            this.idleExpirationMillis = i2;
            if (file == null || !file.exists()) {
                this.peerStatusCache = null;
            } else {
                try {
                    this.peerStatusCache = new PeerStatusCache(recoverPersistedPeerStatuses(this.peersFile), this.peersFile.lastModified());
                } catch (IOException e) {
                    logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", file, e);
                }
            }
            this.taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: org.apache.nifi.remote.client.socket.EndpointConnectionPool.1
                private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread newThread = this.defaultFactory.newThread(runnable);
                    newThread.setName("NiFi Site-to-Site Connection Pool Maintenance");
                    newThread.setDaemon(true);
                    return newThread;
                }
            });
            this.taskExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.nifi.remote.client.socket.EndpointConnectionPool.2
                @Override // java.lang.Runnable
                public void run() {
                    EndpointConnectionPool.this.refreshPeers();
                }
            }, 0L, 5L, TimeUnit.SECONDS);
            this.taskExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.nifi.remote.client.socket.EndpointConnectionPool.3
                @Override // java.lang.Runnable
                public void run() {
                    EndpointConnectionPool.this.cleanupExpiredSockets();
                }
            }, 5L, 5L, TimeUnit.SECONDS);
        } catch (URISyntaxException e2) {
            throw new IllegalArgumentException("Invalid Cluster URL: " + str);
        }
    }

    void warn(String str, Object... objArr) {
        logger.warn(str, objArr);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, MessageFormatter.arrayFormat(str, objArr).getMessage());
        }
    }

    void warn(String str, Throwable th) {
        logger.warn(str, th);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, str + ": " + th.toString());
        }
    }

    void error(String str, Object... objArr) {
        logger.error(str, objArr);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.ERROR, CATEGORY, MessageFormatter.arrayFormat(str, objArr).getMessage());
        }
    }

    private String getPortIdentifier(TransferDirection transferDirection) throws IOException {
        return this.remoteDestination.getIdentifier() != null ? this.remoteDestination.getIdentifier() : transferDirection == TransferDirection.RECEIVE ? getOutputPortIdentifier(this.remoteDestination.getName()) : getInputPortIdentifier(this.remoteDestination.getName());
    }

    public EndpointConnection getEndpointConnection(TransferDirection transferDirection) throws IOException {
        return getEndpointConnection(transferDirection, null);
    }

    public EndpointConnection getEndpointConnection(TransferDirection transferDirection, SiteToSiteClientConfig siteToSiteClientConfig) throws IOException {
        CommunicationsSession communicationsSession;
        FlowFileCodec flowFileCodec = null;
        CommunicationsSession communicationsSession2 = null;
        SocketClientProtocol socketClientProtocol = null;
        while (true) {
            ArrayList arrayList = new ArrayList();
            logger.debug("{} getting next peer status", this);
            PeerStatus nextPeerStatus = getNextPeerStatus(transferDirection);
            logger.debug("{} next peer status = {}", this, nextPeerStatus);
            if (nextPeerStatus == null) {
                return null;
            }
            PeerDescription peerDescription = nextPeerStatus.getPeerDescription();
            BlockingQueue<EndpointConnection> blockingQueue = this.connectionQueueMap.get(peerDescription);
            if (blockingQueue == null) {
                blockingQueue = new LinkedBlockingQueue();
                BlockingQueue<EndpointConnection> putIfAbsent = this.connectionQueueMap.putIfAbsent(peerDescription, blockingQueue);
                if (putIfAbsent != null) {
                    blockingQueue = putIfAbsent;
                }
            }
            try {
                try {
                    EndpointConnection poll = blockingQueue.poll();
                    logger.debug("{} Connection State for {} = {}", new Object[]{this, this.clusterUrl, poll});
                    String portIdentifier = getPortIdentifier(transferDirection);
                    if (poll == null && !arrayList.isEmpty()) {
                        logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portIdentifier);
                        if (!arrayList.isEmpty()) {
                            blockingQueue.addAll(arrayList);
                            arrayList.clear();
                        }
                        return null;
                    }
                    if (poll == null || !poll.getPeer().isPenalized(portIdentifier)) {
                        if (poll == null) {
                            logger.debug("{} No Connection available for Port {}; creating new Connection", this, portIdentifier);
                            socketClientProtocol = new SocketClientProtocol();
                            socketClientProtocol.setDestination(new IdEnrichedRemoteDestination(this.remoteDestination, portIdentifier));
                            socketClientProtocol.setEventReporter(this.eventReporter);
                            long yieldPeriod = this.remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
                            try {
                                logger.debug("{} Establishing site-to-site connection with {}", this, nextPeerStatus);
                                communicationsSession2 = establishSiteToSiteConnection(nextPeerStatus);
                                DataInputStream dataInputStream = new DataInputStream(communicationsSession2.getInput().getInputStream());
                                DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession2.getOutput().getOutputStream());
                                try {
                                    logger.debug("{} Negotiating protocol", this);
                                    RemoteResourceInitiator.initiateResourceNegotiation(socketClientProtocol, dataInputStream, dataOutputStream);
                                } catch (HandshakeException e) {
                                    try {
                                        communicationsSession2.close();
                                    } catch (IOException e2) {
                                        throw e;
                                    }
                                }
                                Peer peer = new Peer(peerDescription, communicationsSession2, "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort(), this.clusterUrl.toString());
                                if (siteToSiteClientConfig != null) {
                                    socketClientProtocol.setTimeout((int) siteToSiteClientConfig.getTimeout(TimeUnit.MILLISECONDS));
                                    socketClientProtocol.setPreferredBatchCount(siteToSiteClientConfig.getPreferredBatchCount());
                                    socketClientProtocol.setPreferredBatchSize(siteToSiteClientConfig.getPreferredBatchSize());
                                    socketClientProtocol.setPreferredBatchDuration(siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
                                }
                                try {
                                    try {
                                        logger.debug("{} performing handshake", this);
                                        socketClientProtocol.handshake(peer);
                                        if (socketClientProtocol.isDestinationFull()) {
                                            Logger logger2 = logger;
                                            Object[] objArr = new Object[3];
                                            objArr[0] = this;
                                            objArr[1] = peer;
                                            objArr[2] = siteToSiteClientConfig.getPortName() == null ? siteToSiteClientConfig.getPortIdentifier() : siteToSiteClientConfig.getPortName();
                                            logger2.warn("{} {} indicates that port {}'s destination is full; penalizing peer", objArr);
                                            penalize(peer, yieldPeriod);
                                            try {
                                                peer.close();
                                            } catch (IOException e3) {
                                            }
                                        } else {
                                            if (socketClientProtocol.isPortInvalid()) {
                                                penalize(peer, yieldPeriod);
                                                cleanup(socketClientProtocol, peer);
                                                throw new PortNotRunningException(peer.toString() + " indicates that port " + portIdentifier + " is not running");
                                            }
                                            if (socketClientProtocol.isPortUnknown()) {
                                                penalize(peer, yieldPeriod);
                                                cleanup(socketClientProtocol, peer);
                                                throw new UnknownPortException(peer.toString() + " indicates that port " + portIdentifier + " is not known");
                                            }
                                            logger.debug("{} negotiating codec", this);
                                            flowFileCodec = socketClientProtocol.negotiateCodec(peer);
                                            logger.debug("{} negotiated codec is {}", this, flowFileCodec);
                                            poll = new EndpointConnection(peer, socketClientProtocol, flowFileCodec);
                                        }
                                    } catch (Exception e4) {
                                        penalize(peer, yieldPeriod);
                                        cleanup(socketClientProtocol, peer);
                                        Object[] objArr2 = new Object[3];
                                        objArr2[0] = this;
                                        objArr2[1] = peer == null ? this.clusterUrl : peer;
                                        objArr2[2] = e4.toString();
                                        error(String.format("%s failed to communicate with %s due to %s", objArr2), new Object[0]);
                                        if (logger.isDebugEnabled()) {
                                            logger.error("", e4);
                                        }
                                        throw e4;
                                    }
                                } catch (PortNotRunningException | UnknownPortException e5) {
                                    throw e5;
                                }
                            } catch (IOException e6) {
                                penalize(nextPeerStatus.getPeerDescription(), yieldPeriod);
                                throw e6;
                            }
                        } else {
                            long currentTimeMillis = System.currentTimeMillis() - poll.getLastTimeUsed();
                            if (this.commsTimeout <= 0 || currentTimeMillis < this.commsTimeout) {
                                flowFileCodec = poll.getCodec();
                                communicationsSession2 = poll.getPeer().getCommunicationsSession();
                                socketClientProtocol = poll.getSocketClientProtocol();
                            } else {
                                cleanup(poll.getSocketClientProtocol(), poll.getPeer());
                                poll = null;
                            }
                        }
                        if (!arrayList.isEmpty()) {
                            blockingQueue.addAll(arrayList);
                            arrayList.clear();
                        }
                    } else {
                        arrayList.add(poll);
                        if (!arrayList.isEmpty()) {
                            blockingQueue.addAll(arrayList);
                            arrayList.clear();
                        }
                    }
                    if (poll != null && flowFileCodec != null) {
                        if (communicationsSession != null && socketClientProtocol != null) {
                            this.activeConnections.add(poll);
                            return poll;
                        }
                    }
                } finally {
                    if (communicationsSession2 != null) {
                        try {
                            communicationsSession2.close();
                        } catch (IOException e7) {
                        }
                    }
                }
            } finally {
                if (!arrayList.isEmpty()) {
                    blockingQueue.addAll(arrayList);
                    arrayList.clear();
                }
            }
        }
    }

    public boolean offer(EndpointConnection endpointConnection) {
        BlockingQueue<EndpointConnection> blockingQueue;
        Peer peer = endpointConnection.getPeer();
        if (peer == null || (blockingQueue = this.connectionQueueMap.get(peer.getDescription())) == null) {
            return false;
        }
        this.activeConnections.remove(endpointConnection);
        if (this.shutdown) {
            terminate(endpointConnection);
            return false;
        }
        endpointConnection.setLastTimeUsed();
        return blockingQueue.offer(endpointConnection);
    }

    private void penalize(PeerDescription peerDescription, long j) {
        Long l = this.peerTimeoutExpirations.get(peerDescription);
        if (l == null) {
            l = 0L;
        }
        this.peerTimeoutExpirations.put(peerDescription, Long.valueOf(Math.max(l.longValue(), System.currentTimeMillis() + j)));
    }

    public void penalize(Peer peer, long j) {
        penalize(peer.getDescription(), j);
    }

    private void cleanup(SocketClientProtocol socketClientProtocol, Peer peer) {
        if (socketClientProtocol != null && peer != null) {
            try {
                socketClientProtocol.shutdown(peer);
            } catch (TransmissionDisabledException e) {
                logger.debug(this + " Transmission Disabled by User");
            } catch (IOException e2) {
            }
        }
        if (peer != null) {
            try {
                peer.close();
            } catch (IOException e3) {
            } catch (TransmissionDisabledException e4) {
                logger.debug(this + " Transmission Disabled by User");
            }
        }
    }

    private boolean isPeerRefreshNeeded(List<PeerStatus> list) {
        return list == null || list.isEmpty() || System.currentTimeMillis() > this.peerRefreshTime + PEER_REFRESH_PERIOD;
    }

    private PeerStatus getNextPeerStatus(TransferDirection transferDirection) {
        List<PeerStatus> list = this.peerStatuses;
        if (isPeerRefreshNeeded(list)) {
            this.peerRefreshLock.lock();
            try {
                list = this.peerStatuses;
                if (isPeerRefreshNeeded(list)) {
                    try {
                        list = createPeerStatusList(transferDirection);
                    } catch (Exception e) {
                        String format = String.format("%s Failed to update list of peers due to %s", this, e.toString());
                        warn(format, new Object[0]);
                        if (logger.isDebugEnabled()) {
                            logger.warn("", e);
                        }
                        if (this.eventReporter != null) {
                            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format);
                        }
                    }
                    this.peerStatuses = list;
                    this.peerRefreshTime = System.currentTimeMillis();
                }
            } finally {
                this.peerRefreshLock.unlock();
            }
        }
        if (list == null || list.isEmpty()) {
            return null;
        }
        for (int i = 0; i < list.size(); i++) {
            PeerStatus peerStatus = list.get((int) (this.peerIndex.getAndIncrement() % list.size()));
            if (!isPenalized(peerStatus)) {
                return peerStatus;
            }
            logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus);
        }
        logger.debug("{} All peers appear to be penalized; returning null", this);
        return null;
    }

    private boolean isPenalized(PeerStatus peerStatus) {
        Long l = this.peerTimeoutExpirations.get(peerStatus.getPeerDescription());
        return l != null && l.longValue() > System.currentTimeMillis();
    }

    private List<PeerStatus> createPeerStatusList(TransferDirection transferDirection) throws IOException {
        Set<PeerStatus> peerStatuses = getPeerStatuses();
        if (peerStatuses == null) {
            refreshPeers();
            peerStatuses = getPeerStatuses();
            if (peerStatuses == null) {
                logger.debug("{} found no peers to connect to", this);
                return Collections.emptyList();
            }
        }
        ClusterNodeInformation clusterNodeInformation = new ClusterNodeInformation();
        ArrayList arrayList = new ArrayList();
        for (PeerStatus peerStatus : peerStatuses) {
            PeerDescription peerDescription = peerStatus.getPeerDescription();
            arrayList.add(new NodeInformation(peerDescription.getHostname(), Integer.valueOf(peerDescription.getPort()), 0, peerDescription.isSecure(), peerStatus.getFlowFileCount()));
        }
        clusterNodeInformation.setNodeInformation(arrayList);
        return formulateDestinationList(clusterNodeInformation, transferDirection);
    }

    private Set<PeerStatus> getPeerStatuses() {
        PeerStatusCache peerStatusCache = this.peerStatusCache;
        if (peerStatusCache == null || peerStatusCache.getStatuses() == null || peerStatusCache.getStatuses().isEmpty()) {
            return null;
        }
        if (peerStatusCache.getTimestamp() + PEER_CACHE_MILLIS >= System.currentTimeMillis()) {
            return peerStatusCache.getStatuses();
        }
        HashSet hashSet = new HashSet(peerStatusCache.getStatuses().size());
        Iterator<PeerStatus> it = peerStatusCache.getStatuses().iterator();
        while (it.hasNext()) {
            hashSet.add(new PeerStatus(it.next().getPeerDescription(), 1));
        }
        return hashSet;
    }

    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
        String host = this.clusterUrl.getHost();
        Integer siteToSitePort = getSiteToSitePort();
        if (siteToSitePort == null) {
            throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
        }
        PeerDescription peerDescription = new PeerDescription(host, siteToSitePort.intValue(), this.clusterUrl.toString().startsWith("https://"));
        CommunicationsSession establishSiteToSiteConnection = establishSiteToSiteConnection(host, siteToSitePort.intValue());
        Peer peer = new Peer(peerDescription, establishSiteToSiteConnection, "nifi://" + host + ":" + siteToSitePort, this.clusterUrl.toString());
        SocketClientProtocol socketClientProtocol = new SocketClientProtocol();
        RemoteResourceInitiator.initiateResourceNegotiation(socketClientProtocol, new DataInputStream(establishSiteToSiteConnection.getInput().getInputStream()), new DataOutputStream(establishSiteToSiteConnection.getOutput().getOutputStream()));
        socketClientProtocol.setTimeout(this.commsTimeout);
        if (socketClientProtocol.getVersionNegotiator().getVersion() < 5) {
            String portIdentifier = getPortIdentifier(TransferDirection.RECEIVE);
            if (portIdentifier == null) {
                portIdentifier = getPortIdentifier(TransferDirection.SEND);
            }
            if (portIdentifier == null) {
                peer.close();
                throw new IOException("Failed to determine the identifier of port " + this.remoteDestination.getName());
            }
            socketClientProtocol.handshake(peer, portIdentifier);
        } else {
            socketClientProtocol.handshake(peer, null);
        }
        Set<PeerStatus> peerStatuses = socketClientProtocol.getPeerStatuses(peer);
        persistPeerStatuses(peerStatuses);
        try {
            socketClientProtocol.shutdown(peer);
        } catch (IOException e) {
            warn(String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString()), new Object[0]);
            if (logger.isDebugEnabled()) {
                logger.warn("", e);
            }
        }
        try {
            peer.close();
        } catch (IOException e2) {
            warn(String.format("%s Failed to close resources when updating list of peers due to %s", this, e2.toString()), new Object[0]);
            if (logger.isDebugEnabled()) {
                logger.warn("", e2);
            }
        }
        return peerStatuses;
    }

    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r10v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r9v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r9v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 10, insn: 0x0112: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r10 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:64:0x0112 */
    /* JADX WARN: Not initialized variable reg: 9, insn: 0x010e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:62:0x010e */
    /* JADX WARN: Type inference failed for: r10v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r9v0, types: [java.io.OutputStream] */
    private void persistPeerStatuses(Set<PeerStatus> set) {
        if (this.peersFile == null) {
            return;
        }
        try {
            try {
                FileOutputStream fileOutputStream = new FileOutputStream(this.peersFile);
                Throwable th = null;
                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
                Throwable th2 = null;
                try {
                    try {
                        Iterator<PeerStatus> it = set.iterator();
                        while (it.hasNext()) {
                            PeerDescription peerDescription = it.next().getPeerDescription();
                            bufferedOutputStream.write((peerDescription.getHostname() + ":" + peerDescription.getPort() + ":" + peerDescription.isSecure() + "\n").getBytes(StandardCharsets.UTF_8));
                        }
                        if (bufferedOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    bufferedOutputStream.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                bufferedOutputStream.close();
                            }
                        }
                        if (fileOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    fileOutputStream.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                fileOutputStream.close();
                            }
                        }
                    } catch (Throwable th5) {
                        th2 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (bufferedOutputStream != null) {
                        if (th2 != null) {
                            try {
                                bufferedOutputStream.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        } else {
                            bufferedOutputStream.close();
                        }
                    }
                    throw th6;
                }
            } catch (IOException e) {
                error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString());
                logger.error("", e);
            }
        } finally {
        }
    }

    private Set<PeerStatus> recoverPersistedPeerStatuses(File file) throws IOException {
        if (!file.exists()) {
            return null;
        }
        HashSet hashSet = new HashSet();
        FileInputStream fileInputStream = new FileInputStream(file);
        Throwable th = null;
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileInputStream));
            Throwable th2 = null;
            while (true) {
                try {
                    try {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        }
                        String[] split = readLine.split(Pattern.quote(":"));
                        if (split.length == 3) {
                            hashSet.add(new PeerStatus(new PeerDescription(split[0], Integer.parseInt(split[1]), Boolean.parseBoolean(split[2])), 1));
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (bufferedReader != null) {
                        if (th2 != null) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th4) {
                                th2.addSuppressed(th4);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th3;
                }
            }
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th5) {
                        th2.addSuppressed(th5);
                    }
                } else {
                    bufferedReader.close();
                }
            }
            return hashSet;
        } finally {
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    fileInputStream.close();
                }
            }
        }
    }

    private CommunicationsSession establishSiteToSiteConnection(PeerStatus peerStatus) throws IOException {
        PeerDescription peerDescription = peerStatus.getPeerDescription();
        return establishSiteToSiteConnection(peerDescription.getHostname(), peerDescription.getPort());
    }

    private CommunicationsSession establishSiteToSiteConnection(String str, int i) throws IOException {
        String str2 = "nifi://" + str + ":" + i;
        CommunicationsSession communicationsSession = null;
        try {
            if (!isSecure()) {
                SocketChannel open = SocketChannel.open();
                open.socket().connect(new InetSocketAddress(str, i), this.commsTimeout);
                open.socket().setSoTimeout(this.commsTimeout);
                communicationsSession = new SocketChannelCommunicationsSession(open, str2);
            } else {
                if (this.sslContext == null) {
                    throw new IOException("Unable to communicate with " + str + ":" + i + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
                }
                SSLSocketChannel sSLSocketChannel = new SSLSocketChannel(this.sslContext, str, i, true);
                sSLSocketChannel.connect();
                communicationsSession = new SSLSocketChannelCommunicationsSession(sSLSocketChannel, str2);
                try {
                    communicationsSession.setUserDn(sSLSocketChannel.getDn());
                } catch (CertificateException e) {
                    throw new IOException(e);
                }
            }
            communicationsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
            communicationsSession.setUri(str2);
            return communicationsSession;
        } catch (IOException e2) {
            if (communicationsSession != null) {
                communicationsSession.close();
            }
            throw e2;
        }
    }

    static List<PeerStatus> formulateDestinationList(ClusterNodeInformation clusterNodeInformation, TransferDirection transferDirection) {
        int size;
        Collection<NodeInformation> nodeInformation = clusterNodeInformation.getNodeInformation();
        int max = Math.max(128, nodeInformation.size());
        HashMap hashMap = new HashMap();
        long j = 0;
        while (nodeInformation.iterator().hasNext()) {
            j += r0.next().getTotalFlowFiles();
        }
        int i = 0;
        for (NodeInformation nodeInformation2 : nodeInformation) {
            double min = Math.min(0.8d, nodeInformation2.getTotalFlowFiles() / j);
            int max2 = Math.max(1, (int) (max * (transferDirection == TransferDirection.SEND ? 1.0d - min : min)));
            hashMap.put(nodeInformation2, Integer.valueOf(Math.max(1, max2)));
            i += max2;
        }
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(null);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            NodeInformation nodeInformation3 = (NodeInformation) entry.getKey();
            int intValue = ((Integer) entry.getValue()).intValue();
            for (int i3 = 0; i3 < intValue; i3++) {
                int i4 = intValue * i3;
                while (true) {
                    size = i4 % arrayList.size();
                    if (((PeerStatus) arrayList.get(size)) == null) {
                        break;
                    }
                    i4++;
                }
                arrayList.set(size, new PeerStatus(new PeerDescription(nodeInformation3.getSiteToSiteHostname(), nodeInformation3.getSiteToSitePort().intValue(), nodeInformation3.isSiteToSiteSecure()), nodeInformation3.getTotalFlowFiles()));
            }
        }
        StringBuilder sb = new StringBuilder();
        sb.append("New Weighted Distribution of Nodes:");
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            sb.append("\n").append(((Map.Entry) it.next()).getKey()).append(" will receive ").append((((Integer) r0.getValue()).intValue() * 100.0d) / arrayList.size()).append("% of data");
        }
        logger.info(sb.toString());
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupExpiredSockets() {
        for (BlockingQueue<EndpointConnection> blockingQueue : this.connectionQueueMap.values()) {
            ArrayList arrayList = new ArrayList();
            while (true) {
                EndpointConnection poll = blockingQueue.poll();
                if (poll != null) {
                    if (poll.getLastTimeUsed() < System.currentTimeMillis() - this.idleExpirationMillis) {
                        try {
                            poll.getSocketClientProtocol().shutdown(poll.getPeer());
                        } catch (Exception e) {
                            logger.debug("Failed to shut down {} using {} due to {}", new Object[]{poll.getSocketClientProtocol(), poll.getPeer(), e});
                        }
                        terminate(poll);
                    } else {
                        arrayList.add(poll);
                    }
                }
            }
            blockingQueue.addAll(arrayList);
        }
    }

    public void shutdown() {
        this.shutdown = true;
        this.taskExecutor.shutdown();
        this.peerTimeoutExpirations.clear();
        Iterator<EndpointConnection> it = this.activeConnections.iterator();
        while (it.hasNext()) {
            it.next().getPeer().getCommunicationsSession().interrupt();
        }
        for (BlockingQueue<EndpointConnection> blockingQueue : this.connectionQueueMap.values()) {
            while (true) {
                EndpointConnection poll = blockingQueue.poll();
                if (poll != null) {
                    terminate(poll);
                }
            }
        }
    }

    public void terminate(EndpointConnection endpointConnection) {
        this.activeConnections.remove(endpointConnection);
        cleanup(endpointConnection.getSocketClientProtocol(), endpointConnection.getPeer());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refreshPeers() {
        PeerStatusCache peerStatusCache = this.peerStatusCache;
        if (peerStatusCache == null || peerStatusCache.getTimestamp() + PEER_CACHE_MILLIS <= System.currentTimeMillis()) {
            try {
                Set<PeerStatus> fetchRemotePeerStatuses = fetchRemotePeerStatuses();
                this.peerStatusCache = new PeerStatusCache(fetchRemotePeerStatuses);
                logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, Integer.valueOf(fetchRemotePeerStatuses.size()));
            } catch (Exception e) {
                warn("{} Unable to refresh Remote Group's peers due to {}", this, e);
                if (logger.isDebugEnabled()) {
                    logger.warn("", e);
                }
            }
        }
    }

    public String getInputPortIdentifier(String str) throws IOException {
        return getPortIdentifier(str, this.inputPortMap);
    }

    public String getOutputPortIdentifier(String str) throws IOException {
        return getPortIdentifier(str, this.outputPortMap);
    }

    private String getPortIdentifier(String str, Map<String, String> map) throws IOException {
        this.remoteInfoReadLock.lock();
        try {
            String str2 = map.get(str);
            this.remoteInfoReadLock.unlock();
            if (str2 != null) {
                return str2;
            }
            refreshRemoteInfo();
            this.remoteInfoReadLock.lock();
            try {
                String str3 = map.get(str);
                this.remoteInfoReadLock.unlock();
                return str3;
            } finally {
            }
        } finally {
        }
    }

    private ControllerDTO refreshRemoteInfo() throws IOException {
        ControllerDTO controller = new NiFiRestApiUtil(this.clusterUrl.toString().startsWith("https") ? this.sslContext : null).getController(this.apiUri + "/controller", this.commsTimeout);
        this.remoteInfoWriteLock.lock();
        try {
            this.siteToSitePort = controller.getRemoteSiteListeningPort();
            this.siteToSiteSecure = controller.isSiteToSiteSecure();
            this.inputPortMap.clear();
            for (PortDTO portDTO : controller.getInputPorts()) {
                this.inputPortMap.put(portDTO.getName(), portDTO.getId());
            }
            this.outputPortMap.clear();
            for (PortDTO portDTO2 : controller.getOutputPorts()) {
                this.outputPortMap.put(portDTO2.getName(), portDTO2.getId());
            }
            this.remoteRefreshTime = System.currentTimeMillis();
            this.remoteInfoWriteLock.unlock();
            return controller;
        } catch (Throwable th) {
            this.remoteInfoWriteLock.unlock();
            throw th;
        }
    }

    private Integer getSiteToSitePort() throws IOException {
        this.remoteInfoReadLock.lock();
        try {
            Integer num = this.siteToSitePort;
            if (num != null) {
                if (this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
                    return num;
                }
            }
            return refreshRemoteInfo().getRemoteSiteListeningPort();
        } finally {
            this.remoteInfoReadLock.unlock();
        }
    }

    public String toString() {
        return "EndpointConnectionPool[Cluster URL=" + this.clusterUrl + "]";
    }

    public boolean isSecure() throws IOException {
        this.remoteInfoReadLock.lock();
        try {
            Boolean bool = this.siteToSiteSecure;
            if (bool != null && this.remoteRefreshTime > System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
                return bool.booleanValue();
            }
            Boolean isSiteToSiteSecure = refreshRemoteInfo().isSiteToSiteSecure();
            if (isSiteToSiteSecure == null) {
                throw new IOException("Remote NiFi instance " + this.clusterUrl + " is not currently configured to accept site-to-site connections");
            }
            return isSiteToSiteSecure.booleanValue();
        } finally {
            this.remoteInfoReadLock.unlock();
        }
    }
}
