package org.apache.nifi.remote.client.socket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.PeerPersistence;
import org.apache.nifi.remote.client.PeerSelector;
import org.apache.nifi.remote.client.PeerStatusProvider;
import org.apache.nifi.remote.client.SiteInfoProvider;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.exception.UnreachableClusterException;
import org.apache.nifi.remote.io.socket.SocketCommunicationsSession;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
import org.apache.nifi.remote.util.EventReportUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/client/socket/EndpointConnectionPool.class */
public class EndpointConnectionPool implements PeerStatusProvider {
    private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionPool.class);
    private static final SocketPeerIdentityProvider socketPeerIdentityProvider = new StandardSocketPeerIdentityProvider();
    private final EventReporter eventReporter;
    private final SSLContext sslContext;
    private final ScheduledExecutorService taskExecutor;
    private final int idleExpirationMillis;
    private final RemoteDestination remoteDestination;
    private volatile int commsTimeout;
    private final SiteInfoProvider siteInfoProvider;
    private final PeerSelector peerSelector;
    private final InetAddress localAddress;
    private final ConcurrentMap<PeerDescription, BlockingQueue<EndpointConnection>> connectionQueueMap = new ConcurrentHashMap();
    private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet());
    private volatile boolean shutdown = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/remote/client/socket/EndpointConnectionPool$IdEnrichedRemoteDestination.class */
    public class IdEnrichedRemoteDestination implements RemoteDestination {
        private final RemoteDestination original;
        private final String identifier;

        public IdEnrichedRemoteDestination(EndpointConnectionPool endpointConnectionPool, RemoteDestination remoteDestination, String str) {
            this.original = remoteDestination;
            this.identifier = str;
        }

        public String getIdentifier() {
            return this.identifier;
        }

        public String getName() {
            return this.original.getName();
        }

        public long getYieldPeriod(TimeUnit timeUnit) {
            return this.original.getYieldPeriod(timeUnit);
        }

        public boolean isUseCompression() {
            return this.original.isUseCompression();
        }
    }

    public EndpointConnectionPool(RemoteDestination remoteDestination, int i, int i2, SSLContext sSLContext, EventReporter eventReporter, PeerPersistence peerPersistence, SiteInfoProvider siteInfoProvider, InetAddress inetAddress) {
        Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null");
        this.remoteDestination = remoteDestination;
        this.sslContext = sSLContext;
        this.eventReporter = eventReporter;
        this.commsTimeout = i;
        this.idleExpirationMillis = i2;
        this.localAddress = inetAddress;
        this.siteInfoProvider = siteInfoProvider;
        this.peerSelector = new PeerSelector(this, peerPersistence);
        this.peerSelector.setEventReporter(eventReporter);
        this.taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory(this) { // from class: org.apache.nifi.remote.client.socket.EndpointConnectionPool.1
            private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread newThread = this.defaultFactory.newThread(runnable);
                newThread.setName("NiFi Site-to-Site Connection Pool Maintenance");
                newThread.setDaemon(true);
                return newThread;
            }
        });
        this.taskExecutor.scheduleWithFixedDelay(() -> {
            this.peerSelector.refresh();
        }, 0L, 5L, TimeUnit.SECONDS);
        this.taskExecutor.scheduleWithFixedDelay(() -> {
            cleanupExpiredSockets();
        }, 5L, 5L, TimeUnit.SECONDS);
    }

    private String getPortIdentifier(TransferDirection transferDirection) throws IOException {
        return this.remoteDestination.getIdentifier() != null ? this.remoteDestination.getIdentifier() : this.siteInfoProvider.getPortIdentifier(this.remoteDestination.getName(), transferDirection);
    }

    public EndpointConnection getEndpointConnection(TransferDirection transferDirection) throws IOException {
        return getEndpointConnection(transferDirection, null);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public EndpointConnection getEndpointConnection(TransferDirection transferDirection, SiteToSiteClientConfig siteToSiteClientConfig) throws IOException {
        CommunicationsSession communicationsSession;
        FlowFileCodec flowFileCodec = null;
        CommunicationsSession communicationsSession2 = null;
        SocketClientProtocol socketClientProtocol = null;
        try {
            URI activeClusterUrl = this.siteInfoProvider.getActiveClusterUrl();
            while (true) {
                ArrayList arrayList = new ArrayList();
                logger.debug("{} getting next peer status", this);
                PeerStatus nextPeerStatus = this.peerSelector.getNextPeerStatus(transferDirection);
                logger.debug("{} next peer status = {}", this, nextPeerStatus);
                if (nextPeerStatus == null) {
                    return null;
                }
                PeerDescription peerDescription = nextPeerStatus.getPeerDescription();
                BlockingQueue<EndpointConnection> blockingQueue = this.connectionQueueMap.get(peerDescription);
                if (blockingQueue == null) {
                    blockingQueue = new LinkedBlockingQueue();
                    BlockingQueue<EndpointConnection> putIfAbsent = this.connectionQueueMap.putIfAbsent(peerDescription, blockingQueue);
                    if (putIfAbsent != null) {
                        blockingQueue = putIfAbsent;
                    }
                }
                try {
                    try {
                        EndpointConnection poll = blockingQueue.poll();
                        logger.debug("{} Connection State for {} = {}", new Object[]{this, activeClusterUrl, poll});
                        String portIdentifier = getPortIdentifier(transferDirection);
                        if (poll == null && !arrayList.isEmpty()) {
                            logger.debug("{} all Connections for {} are penalized; returning no Connection", this, portIdentifier);
                            if (!arrayList.isEmpty()) {
                                blockingQueue.addAll(arrayList);
                                arrayList.clear();
                            }
                            return null;
                        }
                        if (poll == null || !poll.getPeer().isPenalized(portIdentifier)) {
                            if (poll == null) {
                                logger.debug("{} No Connection available for Port {}; creating new Connection", this, portIdentifier);
                                socketClientProtocol = new SocketClientProtocol();
                                socketClientProtocol.setDestination(new IdEnrichedRemoteDestination(this, this.remoteDestination, portIdentifier));
                                socketClientProtocol.setEventReporter(this.eventReporter);
                                long yieldPeriod = this.remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS);
                                try {
                                    logger.debug("{} Establishing site-to-site connection with {}", this, nextPeerStatus);
                                    communicationsSession2 = establishSiteToSiteConnection(nextPeerStatus);
                                    DataInputStream dataInputStream = new DataInputStream(communicationsSession2.getInput().getInputStream());
                                    DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession2.getOutput().getOutputStream());
                                    try {
                                        logger.debug("{} Negotiating protocol", this);
                                        RemoteResourceInitiator.initiateResourceNegotiation(socketClientProtocol, dataInputStream, dataOutputStream);
                                    } catch (HandshakeException e) {
                                        try {
                                            communicationsSession2.close();
                                        } catch (IOException e2) {
                                            throw e;
                                        }
                                    }
                                    Peer peer = new Peer(peerDescription, communicationsSession2, "nifi://" + peerDescription.getHostname() + ":" + peerDescription.getPort(), activeClusterUrl.toString());
                                    if (siteToSiteClientConfig != null) {
                                        socketClientProtocol.setTimeout((int) siteToSiteClientConfig.getTimeout(TimeUnit.MILLISECONDS));
                                        socketClientProtocol.setPreferredBatchCount(siteToSiteClientConfig.getPreferredBatchCount());
                                        socketClientProtocol.setPreferredBatchSize(siteToSiteClientConfig.getPreferredBatchSize());
                                        socketClientProtocol.setPreferredBatchDuration(siteToSiteClientConfig.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
                                    }
                                    try {
                                        try {
                                            logger.debug("{} performing handshake", this);
                                            socketClientProtocol.handshake(peer);
                                            if (socketClientProtocol.isDestinationFull()) {
                                                Logger logger2 = logger;
                                                Object[] objArr = new Object[3];
                                                objArr[0] = this;
                                                objArr[1] = peer;
                                                objArr[2] = siteToSiteClientConfig.getPortName() == null ? siteToSiteClientConfig.getPortIdentifier() : siteToSiteClientConfig.getPortName();
                                                logger2.warn("{} {} indicates that port {}'s destination is full; penalizing peer", objArr);
                                                this.peerSelector.penalize(peer, yieldPeriod);
                                                try {
                                                    peer.close();
                                                } catch (IOException e3) {
                                                }
                                            } else {
                                                if (socketClientProtocol.isPortInvalid()) {
                                                    this.peerSelector.penalize(peer, yieldPeriod);
                                                    cleanup(socketClientProtocol, peer);
                                                    throw new PortNotRunningException(peer.toString() + " indicates that port " + portIdentifier + " is not running");
                                                }
                                                if (socketClientProtocol.isPortUnknown()) {
                                                    this.peerSelector.penalize(peer, yieldPeriod);
                                                    cleanup(socketClientProtocol, peer);
                                                    throw new UnknownPortException(peer.toString() + " indicates that port " + portIdentifier + " is not known");
                                                }
                                                logger.debug("{} negotiating codec", this);
                                                flowFileCodec = socketClientProtocol.negotiateCodec(peer);
                                                logger.debug("{} negotiated codec is {}", this, flowFileCodec);
                                                poll = new EndpointConnection(peer, socketClientProtocol, flowFileCodec);
                                            }
                                        } catch (Exception e4) {
                                            this.peerSelector.penalize(peer, yieldPeriod);
                                            cleanup(socketClientProtocol, peer);
                                            Object[] objArr2 = new Object[3];
                                            objArr2[0] = this;
                                            objArr2[1] = peer == null ? activeClusterUrl : peer;
                                            objArr2[2] = e4.toString();
                                            EventReportUtil.error(logger, this.eventReporter, String.format("%s failed to communicate with %s due to %s", objArr2), new Object[0]);
                                            if (logger.isDebugEnabled()) {
                                                logger.error("", e4);
                                            }
                                            throw e4;
                                        }
                                    } catch (PortNotRunningException | UnknownPortException e5) {
                                        throw e5;
                                    }
                                } catch (IOException e6) {
                                    this.peerSelector.penalize(nextPeerStatus.getPeerDescription(), yieldPeriod);
                                    throw e6;
                                }
                            } else {
                                long currentTimeMillis = System.currentTimeMillis() - poll.getLastTimeUsed();
                                if (this.commsTimeout <= 0 || currentTimeMillis < this.commsTimeout) {
                                    flowFileCodec = poll.getCodec();
                                    communicationsSession2 = poll.getPeer().getCommunicationsSession();
                                    socketClientProtocol = poll.getSocketClientProtocol();
                                } else {
                                    cleanup(poll.getSocketClientProtocol(), poll.getPeer());
                                    poll = null;
                                }
                            }
                            if (!arrayList.isEmpty()) {
                                blockingQueue.addAll(arrayList);
                                arrayList.clear();
                            }
                        } else {
                            arrayList.add(poll);
                            if (!arrayList.isEmpty()) {
                                blockingQueue.addAll(arrayList);
                                arrayList.clear();
                            }
                        }
                        if (poll != null && flowFileCodec != null) {
                            if (communicationsSession != null && socketClientProtocol != null) {
                                this.activeConnections.add(poll);
                                return poll;
                            }
                        }
                    } finally {
                        if (communicationsSession2 != null) {
                            try {
                                communicationsSession2.close();
                            } catch (IOException e7) {
                            }
                        }
                    }
                } finally {
                    if (!arrayList.isEmpty()) {
                        blockingQueue.addAll(arrayList);
                        arrayList.clear();
                    }
                }
            }
        } catch (IOException e8) {
            throw new UnreachableClusterException("Unable to refresh details from any of the configured remote instances.", e8);
        }
    }

    public boolean offer(EndpointConnection endpointConnection) {
        BlockingQueue<EndpointConnection> blockingQueue;
        Peer peer = endpointConnection.getPeer();
        if (peer == null || (blockingQueue = this.connectionQueueMap.get(peer.getDescription())) == null) {
            return false;
        }
        this.activeConnections.remove(endpointConnection);
        if (this.shutdown) {
            terminate(endpointConnection);
            return false;
        }
        endpointConnection.setLastTimeUsed();
        return blockingQueue.offer(endpointConnection);
    }

    private void cleanup(SocketClientProtocol socketClientProtocol, Peer peer) {
        if (socketClientProtocol != null && peer != null) {
            try {
                socketClientProtocol.shutdown(peer);
            } catch (IOException e) {
            } catch (TransmissionDisabledException e2) {
                logger.debug(String.valueOf(this) + " Transmission Disabled by User");
            }
        }
        if (peer != null) {
            try {
                peer.close();
            } catch (IOException e3) {
            } catch (TransmissionDisabledException e4) {
                logger.debug(String.valueOf(this) + " Transmission Disabled by User");
            }
        }
    }

    @Override // org.apache.nifi.remote.client.PeerStatusProvider
    public PeerDescription getBootstrapPeerDescription() throws IOException {
        String host = this.siteInfoProvider.getActiveClusterUrl().getHost();
        Integer siteToSitePort = this.siteInfoProvider.getSiteToSitePort();
        if (siteToSitePort == null) {
            throw new IOException("Remote instance of NiFi is not configured to allow RAW Socket site-to-site communications");
        }
        return new PeerDescription(host, siteToSitePort.intValue(), this.siteInfoProvider.isSecure());
    }

    @Override // org.apache.nifi.remote.client.PeerStatusProvider
    public Set<PeerStatus> fetchRemotePeerStatuses(PeerDescription peerDescription) throws IOException {
        String hostname = peerDescription.getHostname();
        int port = peerDescription.getPort();
        URI activeClusterUrl = this.siteInfoProvider.getActiveClusterUrl();
        PeerDescription peerDescription2 = new PeerDescription(hostname, port, activeClusterUrl.toString().startsWith("https://"));
        CommunicationsSession establishSiteToSiteConnection = establishSiteToSiteConnection(hostname, port);
        Peer peer = new Peer(peerDescription2, establishSiteToSiteConnection, "nifi://" + hostname + ":" + port, activeClusterUrl.toString());
        SocketClientProtocol socketClientProtocol = new SocketClientProtocol();
        RemoteResourceInitiator.initiateResourceNegotiation(socketClientProtocol, new DataInputStream(establishSiteToSiteConnection.getInput().getInputStream()), new DataOutputStream(establishSiteToSiteConnection.getOutput().getOutputStream()));
        socketClientProtocol.setTimeout(this.commsTimeout);
        if (socketClientProtocol.getVersionNegotiator().getVersion() < 5) {
            String portIdentifier = getPortIdentifier(TransferDirection.RECEIVE);
            if (portIdentifier == null) {
                portIdentifier = getPortIdentifier(TransferDirection.SEND);
            }
            if (portIdentifier == null) {
                peer.close();
                throw new IOException("Failed to determine the identifier of port " + this.remoteDestination.getName());
            }
            socketClientProtocol.handshake(peer, portIdentifier);
        } else {
            socketClientProtocol.handshake(peer, null);
        }
        Set<PeerStatus> peerStatuses = socketClientProtocol.getPeerStatuses(peer);
        try {
            socketClientProtocol.shutdown(peer);
        } catch (IOException e) {
            EventReportUtil.warn(logger, this.eventReporter, String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString()), new Object[0]);
            if (logger.isDebugEnabled()) {
                logger.warn("", e);
            }
        }
        try {
            peer.close();
        } catch (IOException e2) {
            EventReportUtil.warn(logger, this.eventReporter, String.format("%s Failed to close resources when updating list of peers due to %s", this, e2.toString()), new Object[0]);
            if (logger.isDebugEnabled()) {
                logger.warn("", e2);
            }
        }
        return peerStatuses;
    }

    @Override // org.apache.nifi.remote.client.PeerStatusProvider
    public String getRemoteInstanceUris() {
        return String.join(",", this.siteInfoProvider.getClusterUrls());
    }

    private CommunicationsSession establishSiteToSiteConnection(PeerStatus peerStatus) throws IOException {
        PeerDescription peerDescription = peerStatus.getPeerDescription();
        return establishSiteToSiteConnection(peerDescription.getHostname(), peerDescription.getPort());
    }

    private CommunicationsSession establishSiteToSiteConnection(String str, int i) throws IOException {
        SocketCommunicationsSession socketCommunicationsSession;
        CommunicationsSession communicationsSession = null;
        try {
            if (!this.siteInfoProvider.isSecure()) {
                Socket socket = new Socket();
                socket.connect(new InetSocketAddress(str, i), this.commsTimeout);
                socket.setSoTimeout(this.commsTimeout);
                socketCommunicationsSession = new SocketCommunicationsSession(socket);
            } else {
                if (this.sslContext == null) {
                    throw new IOException("Unable to communicate with " + str + ":" + i + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications");
                }
                Socket createSocket = this.sslContext.getSocketFactory().createSocket(str, i);
                createSocket.setSoTimeout(this.commsTimeout);
                socketCommunicationsSession = new SocketCommunicationsSession(createSocket);
                Optional<String> peerIdentity = socketPeerIdentityProvider.getPeerIdentity(createSocket);
                if (!peerIdentity.isPresent()) {
                    throw new IOException(String.format("Site-to-Site Peer [%s] Identity not found", createSocket.getRemoteSocketAddress()));
                }
                socketCommunicationsSession.setUserDn(peerIdentity.get());
            }
            socketCommunicationsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
            return socketCommunicationsSession;
        } catch (IOException e) {
            if (0 != 0) {
                communicationsSession.close();
            }
            throw e;
        }
    }

    private void cleanupExpiredSockets() {
        for (BlockingQueue<EndpointConnection> blockingQueue : this.connectionQueueMap.values()) {
            ArrayList arrayList = new ArrayList();
            while (true) {
                EndpointConnection poll = blockingQueue.poll();
                if (poll != null) {
                    if (poll.getLastTimeUsed() < System.currentTimeMillis() - this.idleExpirationMillis) {
                        try {
                            poll.getSocketClientProtocol().shutdown(poll.getPeer());
                        } catch (Exception e) {
                            logger.debug("Failed to shut down {} using {} due to {}", new Object[]{poll.getSocketClientProtocol(), poll.getPeer(), e});
                        }
                        terminate(poll);
                    } else {
                        arrayList.add(poll);
                    }
                }
            }
            blockingQueue.addAll(arrayList);
        }
    }

    public void shutdown() {
        this.shutdown = true;
        this.taskExecutor.shutdown();
        this.peerSelector.clear();
        Iterator<EndpointConnection> it = this.activeConnections.iterator();
        while (it.hasNext()) {
            it.next().getPeer().getCommunicationsSession().interrupt();
        }
        for (BlockingQueue<EndpointConnection> blockingQueue : this.connectionQueueMap.values()) {
            while (true) {
                EndpointConnection poll = blockingQueue.poll();
                if (poll != null) {
                    terminate(poll);
                }
            }
        }
    }

    public void terminate(EndpointConnection endpointConnection) {
        this.activeConnections.remove(endpointConnection);
        cleanup(endpointConnection.getSocketClientProtocol(), endpointConnection.getPeer());
    }

    public String toString() {
        return "EndpointConnectionPool[Cluster URL=" + String.valueOf(this.siteInfoProvider.getClusterUrls()) + "]";
    }

    @Override // org.apache.nifi.remote.client.PeerStatusProvider
    public SiteToSiteTransportProtocol getTransportProtocol() {
        return SiteToSiteTransportProtocol.RAW;
    }
}
