package org.apache.nifi.remote.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.EventReportUtil;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/client/PeerSelector.class */
public class PeerSelector {
    private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
    private static final long PEER_REFRESH_PERIOD = 60000;
    private volatile List<PeerStatus> peerStatuses;
    private volatile Set<PeerStatus> lastFetchedQueryablePeers;
    private volatile PeerStatusCache peerStatusCache;
    private final PeerPersistence peerPersistence;
    private EventReporter eventReporter;
    private final PeerStatusProvider peerStatusProvider;
    private final ReentrantLock peerRefreshLock = new ReentrantLock();
    private volatile long peerRefreshTime = 0;
    private final AtomicLong peerIndex = new AtomicLong(0);
    private final ConcurrentMap<PeerDescription, Long> peerTimeoutExpirations = new ConcurrentHashMap();
    private SystemTime systemTime = new SystemTime();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/nifi/remote/client/PeerSelector$SystemTime.class */
    public static class SystemTime {
        SystemTime() {
        }

        long currentTimeMillis() {
            return System.currentTimeMillis();
        }
    }

    void setSystemTime(SystemTime systemTime) {
        logger.info("Replacing systemTime instance to {}.", systemTime);
        this.systemTime = systemTime;
    }

    public PeerSelector(PeerStatusProvider peerStatusProvider, PeerPersistence peerPersistence) {
        this.peerStatusProvider = peerStatusProvider;
        this.peerPersistence = peerPersistence;
        PeerStatusCache peerStatusCache = null;
        if (peerPersistence != null) {
            try {
                peerStatusCache = peerPersistence.restore();
                if (peerStatusCache != null) {
                    SiteToSiteTransportProtocol transportProtocol = peerStatusProvider.getTransportProtocol();
                    SiteToSiteTransportProtocol transportProtocol2 = peerStatusCache.getTransportProtocol();
                    if (!transportProtocol.equals(transportProtocol2)) {
                        logger.info("Discard stored peer statuses in {} because transport protocol has changed from {} to {}", new Object[]{peerPersistence.getClass().getSimpleName(), transportProtocol2, transportProtocol});
                        peerStatusCache = null;
                    }
                }
            } catch (IOException e) {
                logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", peerPersistence.getClass().getSimpleName(), e);
                return;
            }
        }
        this.peerStatusCache = peerStatusCache;
    }

    private void persistPeerStatuses() {
        try {
            this.peerPersistence.save(this.peerStatusCache);
        } catch (IOException e) {
            EventReportUtil.error(logger, this.eventReporter, "Failed to persist list of Peers due to {}; if restarted and the nodes specified at the RPG are down, may be unable to transfer data until communications with those nodes are restored", e.toString());
            logger.error("", e);
        }
    }

    List<PeerStatus> formulateDestinationList(Set<PeerStatus> set, TransferDirection transferDirection) {
        int size;
        int max = Math.max(128, set.size());
        HashMap hashMap = new HashMap();
        long j = 0;
        while (set.iterator().hasNext()) {
            j += r0.next().getFlowFileCount();
        }
        int i = 0;
        for (PeerStatus peerStatus : set) {
            double min = Math.min(0.8d, peerStatus.getFlowFileCount() / j);
            int max2 = Math.max(1, (int) (max * (transferDirection == TransferDirection.SEND ? 1.0d - min : min)));
            hashMap.put(peerStatus, Integer.valueOf(Math.max(1, max2)));
            i += max2;
        }
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(null);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            PeerStatus peerStatus2 = (PeerStatus) entry.getKey();
            int intValue = ((Integer) entry.getValue()).intValue();
            for (int i3 = 0; i3 < intValue; i3++) {
                int i4 = intValue * i3;
                while (true) {
                    size = i4 % arrayList.size();
                    if (((PeerStatus) arrayList.get(size)) == null) {
                        break;
                    }
                    i4++;
                }
                arrayList.set(size, new PeerStatus(peerStatus2.getPeerDescription(), peerStatus2.getFlowFileCount(), peerStatus2.isQueryForPeers()));
            }
        }
        Collections.shuffle(arrayList, new Random(0L));
        StringBuilder sb = new StringBuilder();
        sb.append("New Weighted Distribution of Nodes:");
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            sb.append("\n").append(((Map.Entry) it.next()).getKey()).append(" will receive ").append((((Integer) r0.getValue()).intValue() * 100.0d) / arrayList.size()).append("% of data");
        }
        logger.info(sb.toString());
        return arrayList;
    }

    public void penalize(Peer peer, long j) {
        penalize(peer.getDescription(), j);
    }

    public void penalize(PeerDescription peerDescription, long j) {
        Long l = this.peerTimeoutExpirations.get(peerDescription);
        if (l == null) {
            l = 0L;
        }
        this.peerTimeoutExpirations.put(peerDescription, Long.valueOf(Math.max(l.longValue(), this.systemTime.currentTimeMillis() + j)));
    }

    public boolean isPenalized(PeerStatus peerStatus) {
        Long l = this.peerTimeoutExpirations.get(peerStatus.getPeerDescription());
        return l != null && l.longValue() > this.systemTime.currentTimeMillis();
    }

    public void clear() {
        this.peerTimeoutExpirations.clear();
    }

    private boolean isPeerRefreshNeeded(List<PeerStatus> list) {
        return list == null || list.isEmpty() || this.systemTime.currentTimeMillis() > this.peerRefreshTime + PEER_REFRESH_PERIOD;
    }

    public ArrayList<PeerStatus> getPeerStatuses(TransferDirection transferDirection) {
        List<PeerStatus> list = this.peerStatuses;
        if (isPeerRefreshNeeded(list)) {
            this.peerRefreshLock.lock();
            try {
                list = this.peerStatuses;
                if (isPeerRefreshNeeded(list)) {
                    try {
                        list = createPeerStatusList(transferDirection);
                    } catch (Exception e) {
                        EventReportUtil.warn(logger, this.eventReporter, String.format("%s Failed to update list of peers due to %s", this, e.toString()), new Object[0]);
                        if (logger.isDebugEnabled()) {
                            logger.warn("", e);
                        }
                    }
                    this.peerStatuses = list;
                    this.peerRefreshTime = this.systemTime.currentTimeMillis();
                }
            } finally {
                this.peerRefreshLock.unlock();
            }
        }
        if (list == null || list.isEmpty()) {
            return new ArrayList<>();
        }
        ArrayList<PeerStatus> arrayList = new ArrayList<>(list);
        arrayList.removeIf(peerStatus -> {
            return isPenalized(peerStatus);
        });
        return arrayList;
    }

    private List<PeerStatus> createPeerStatusList(TransferDirection transferDirection) throws IOException {
        Set<PeerStatus> peerStatuses = getPeerStatuses();
        if (peerStatuses == null) {
            refreshPeers();
            peerStatuses = getPeerStatuses();
            if (peerStatuses == null) {
                logger.debug("{} found no peers to connect to", this);
                return Collections.emptyList();
            }
        }
        return formulateDestinationList(peerStatuses, transferDirection);
    }

    private Set<PeerStatus> getPeerStatuses() {
        PeerStatusCache peerStatusCache = this.peerStatusCache;
        if (peerStatusCache == null || peerStatusCache.getStatuses() == null || peerStatusCache.getStatuses().isEmpty()) {
            return null;
        }
        if (peerStatusCache.getTimestamp() + PEER_CACHE_MILLIS >= this.systemTime.currentTimeMillis()) {
            return peerStatusCache.getStatuses();
        }
        HashSet hashSet = new HashSet(peerStatusCache.getStatuses().size());
        for (PeerStatus peerStatus : peerStatusCache.getStatuses()) {
            hashSet.add(new PeerStatus(peerStatus.getPeerDescription(), 1, peerStatus.isQueryForPeers()));
        }
        return hashSet;
    }

    public void refreshPeers() {
        PeerStatusCache peerStatusCache = this.peerStatusCache;
        if (peerStatusCache == null || peerStatusCache.getTimestamp() + PEER_CACHE_MILLIS <= this.systemTime.currentTimeMillis()) {
            try {
                Set<PeerStatus> fetchRemotePeerStatuses = fetchRemotePeerStatuses();
                this.peerStatusCache = new PeerStatusCache(fetchRemotePeerStatuses, System.currentTimeMillis(), this.peerStatusProvider.getTransportProtocol());
                persistPeerStatuses();
                logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, Integer.valueOf(fetchRemotePeerStatuses.size()));
            } catch (Exception e) {
                EventReportUtil.warn(logger, this.eventReporter, "{} Unable to refresh Remote Group's peers due to {}", this, e.getMessage());
                if (logger.isDebugEnabled()) {
                    logger.debug("", e);
                }
            }
        }
    }

    public void setEventReporter(EventReporter eventReporter) {
        this.eventReporter = eventReporter;
    }

    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
        HashSet<PeerDescription> hashSet = new HashSet();
        Set<PeerStatus> set = this.lastFetchedQueryablePeers;
        if (set != null && !set.isEmpty()) {
            set.stream().map(peerStatus -> {
                return peerStatus.getPeerDescription();
            }).forEach(peerDescription -> {
                hashSet.add(peerDescription);
            });
        }
        hashSet.add(this.peerStatusProvider.getBootstrapPeerDescription());
        logger.debug("Fetching remote peer statuses from: {}", hashSet);
        Exception exc = null;
        for (PeerDescription peerDescription2 : hashSet) {
            try {
                Set<PeerStatus> fetchRemotePeerStatuses = this.peerStatusProvider.fetchRemotePeerStatuses(peerDescription2);
                this.lastFetchedQueryablePeers = (Set) fetchRemotePeerStatuses.stream().filter(peerStatus2 -> {
                    return peerStatus2.isQueryForPeers();
                }).collect(Collectors.toSet());
                return fetchRemotePeerStatuses;
            } catch (Exception e) {
                logger.warn("Could not communicate with {}:{} to determine which nodes exist in the remote NiFi cluster, due to {}", new Object[]{peerDescription2.getHostname(), Integer.valueOf(peerDescription2.getPort()), e.toString()});
                exc = e;
            }
        }
        IOException iOException = new IOException("Unable to communicate with remote NiFi cluster in order to determine which nodes exist in the remote cluster");
        if (exc != null) {
            iOException.addSuppressed(exc);
        }
        throw iOException;
    }
}
