package org.apache.nifi.remote.client;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.remote.util.EventReportUtil;
import org.apache.nifi.remote.util.PeerStatusCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/client/PeerSelector.class */
public class PeerSelector {
    private static final Logger logger = LoggerFactory.getLogger(PeerSelector.class);
    private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
    private final PeerPersistence peerPersistence;
    private final PeerStatusProvider peerStatusProvider;
    private final ConcurrentMap<PeerDescription, Long> peerPenaltyExpirations = new ConcurrentHashMap();
    private volatile PeerStatusCache peerStatusCache;
    private EventReporter eventReporter;

    public PeerSelector(PeerStatusProvider peerStatusProvider, PeerPersistence peerPersistence) {
        this.peerStatusProvider = peerStatusProvider;
        this.peerPersistence = peerPersistence;
        restoreInitialPeerStatusCache();
    }

    private void restoreInitialPeerStatusCache() {
        try {
            PeerStatusCache peerStatusCache = null;
            if (this.peerPersistence != null) {
                peerStatusCache = this.peerPersistence.restore();
                if (peerStatusCache != null) {
                    SiteToSiteTransportProtocol transportProtocol = this.peerStatusProvider.getTransportProtocol();
                    SiteToSiteTransportProtocol transportProtocol2 = peerStatusCache.getTransportProtocol();
                    String remoteInstanceUris = this.peerStatusProvider.getRemoteInstanceUris();
                    String remoteInstanceUris2 = peerStatusCache.getRemoteInstanceUris();
                    if (!remoteInstanceUris.equals(remoteInstanceUris2)) {
                        logger.info("Discard stored peer statuses in {} because remote instance URIs has changed from {} to {}", new Object[]{this.peerPersistence.getClass().getSimpleName(), remoteInstanceUris2, remoteInstanceUris});
                        peerStatusCache = null;
                    }
                    if (!transportProtocol.equals(transportProtocol2)) {
                        logger.warn("Discard stored peer statuses in {} because transport protocol has changed from {} to {}", new Object[]{this.peerPersistence.getClass().getSimpleName(), transportProtocol2, transportProtocol});
                        peerStatusCache = null;
                    }
                }
            }
            this.peerStatusCache = peerStatusCache;
        } catch (IOException e) {
            logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", this.peerPersistence.getClass().getSimpleName(), e);
        }
    }

    private static double calculateNormalizedWeight(TransferDirection transferDirection, long j, int i, int i2) {
        double d;
        if (i2 == 1) {
            return 100.0d;
        }
        if (j == 0) {
            d = 1.0d / i2;
        } else {
            double d2 = i / j;
            d = d2;
            if (transferDirection == TransferDirection.SEND) {
                d = (1.0d - d2) / (i2 - 1);
            }
        }
        return new BigDecimal(d * 100.0d).setScale(2, RoundingMode.FLOOR).doubleValue();
    }

    private static LinkedHashMap<PeerStatus, Double> sortMapByWeight(Map<PeerStatus, Double> map) {
        ArrayList arrayList = new ArrayList(map.entrySet());
        arrayList.sort(Map.Entry.comparingByValue());
        LinkedHashMap<PeerStatus, Double> linkedHashMap = new LinkedHashMap<>();
        for (int size = arrayList.size() - 1; size >= 0; size--) {
            Map.Entry entry = (Map.Entry) arrayList.get(size);
            linkedHashMap.put((PeerStatus) entry.getKey(), (Double) entry.getValue());
        }
        return linkedHashMap;
    }

    private static void printDistributionStatistics(Map<PeerStatus, Double> map, TransferDirection transferDirection) {
        if (!logger.isDebugEnabled() || map == null) {
            return;
        }
        DecimalFormat decimalFormat = new DecimalFormat("##.##");
        decimalFormat.setRoundingMode(RoundingMode.FLOOR);
        StringBuilder sb = new StringBuilder();
        sb.append("New weighted distribution of nodes:");
        for (Map.Entry<PeerStatus, Double> entry : map.entrySet()) {
            sb.append("\n").append(entry.getKey()).append(" will").append(transferDirection == TransferDirection.RECEIVE ? " send " : " receive ").append(decimalFormat.format(entry.getValue().doubleValue())).append("% of data");
        }
        logger.debug("{}", sb);
    }

    private static double sumMapValues(Map<PeerStatus, Double> map) {
        return map.values().stream().mapToDouble((v0) -> {
            return v0.doubleValue();
        }).sum();
    }

    public void clear() {
        this.peerPenaltyExpirations.clear();
    }

    public PeerStatus getNextPeerStatus(TransferDirection transferDirection) {
        return getAvailablePeerStatus(buildWeightedPeerMap(getPeerStatuses(), transferDirection));
    }

    public boolean isPenalized(PeerStatus peerStatus) {
        Long l = this.peerPenaltyExpirations.get(peerStatus.getPeerDescription());
        return l != null && l.longValue() > System.currentTimeMillis();
    }

    public void penalize(Peer peer, long j) {
        penalize(peer.getDescription(), j);
    }

    public void penalize(PeerDescription peerDescription, long j) {
        Long l = this.peerPenaltyExpirations.get(peerDescription);
        if (l == null) {
            l = 0L;
        }
        this.peerPenaltyExpirations.put(peerDescription, Long.valueOf(Math.max(l.longValue(), System.currentTimeMillis() + j)));
    }

    public void refresh() {
        logger.debug("External refresh triggered. Last refresh was {} ms ago", Long.valueOf(getCacheAge()));
        if (!isPeerRefreshNeeded()) {
            logger.debug("Cache is still valid; skipping refresh");
        } else {
            logger.debug("Refreshing peer status cache");
            refreshPeerStatusCache();
        }
    }

    public void setEventReporter(EventReporter eventReporter) {
        this.eventReporter = eventReporter;
    }

    LinkedHashMap<PeerStatus, Double> buildWeightedPeerMap(Set<PeerStatus> set, TransferDirection transferDirection) {
        Map<PeerStatus, Double> createDestinationMap = createDestinationMap(set, transferDirection);
        if (createDestinationMap.isEmpty()) {
            logger.debug("No peers available");
            return new LinkedHashMap<>();
        }
        LinkedHashMap<PeerStatus, Double> sortMapByWeight = sortMapByWeight(createDestinationMap);
        printDistributionStatistics(sortMapByWeight, transferDirection);
        return sortMapByWeight;
    }

    private Map<PeerStatus, Double> createDestinationMap(Set<PeerStatus> set, TransferDirection transferDirection) {
        HashMap hashMap = new HashMap();
        long sum = set.stream().mapToLong((v0) -> {
            return v0.getFlowFileCount();
        }).sum();
        logger.debug("Building weighted map of peers with total remote NiFi flowfile count: {}", Long.valueOf(sum));
        for (PeerStatus peerStatus : set) {
            hashMap.put(peerStatus, Double.valueOf(calculateNormalizedWeight(transferDirection, sum, peerStatus.getFlowFileCount(), set.size())));
        }
        return hashMap;
    }

    private Set<PeerStatus> fetchRemotePeerStatuses(Set<PeerDescription> set) throws IOException {
        logger.debug("Fetching remote peer statuses from: {}", set);
        Exception exc = null;
        HashSet hashSet = new HashSet();
        for (PeerDescription peerDescription : set) {
            try {
                hashSet.addAll((Set) this.peerStatusProvider.fetchRemotePeerStatuses(peerDescription).stream().filter((v0) -> {
                    return v0.isQueryForPeers();
                }).collect(Collectors.toSet()));
            } catch (Exception e) {
                logger.warn("Could not communicate with {}:{} to determine which node(s) exist in the remote NiFi instance, due to {}", new Object[]{peerDescription.getHostname(), Integer.valueOf(peerDescription.getPort()), e.toString()});
                exc = e;
            }
        }
        if (!hashSet.isEmpty() || exc == null) {
            return hashSet;
        }
        throw new IOException("Unable to retrieve nodes from remote instance", exc);
    }

    private PeerStatus getAvailablePeerStatus(Map<PeerStatus, Double> map) {
        if (map == null || map.isEmpty()) {
            logger.warn("Available peers collection is empty; no peer available");
            return null;
        }
        Map map2 = (Map) map.entrySet().stream().filter(entry -> {
            return !isPenalized((PeerStatus) entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        double sumMapValues = sumMapValues(map2);
        logger.debug("Determining next available peer ({} peers with total weight {})", Integer.valueOf(map2.keySet().size()), Double.valueOf(sumMapValues));
        double random = Math.random() * Math.min(100.0d, sumMapValues);
        logger.debug("Generated random value {}", Double.valueOf(random));
        double d = 0.0d;
        for (Map.Entry entry2 : map2.entrySet()) {
            logger.debug("Initial threshold was {}; added peer value {}; total {}", new Object[]{Double.valueOf(d), entry2.getValue(), Double.valueOf(d + ((Double) entry2.getValue()).doubleValue())});
            d += ((Double) entry2.getValue()).doubleValue();
            if (random <= d) {
                return (PeerStatus) entry2.getKey();
            }
        }
        logger.debug("Did not select a peer; r {}, t {}, w {}", new Object[]{Double.valueOf(random), Double.valueOf(d), map.values()});
        logger.debug("All peers appear to be penalized; returning null");
        return null;
    }

    private long getCacheAge() {
        if (this.peerStatusCache == null) {
            return -1L;
        }
        return System.currentTimeMillis() - this.peerStatusCache.getTimestamp();
    }

    private Set<PeerStatus> getLastFetchedQueryablePeers() {
        return this.peerStatusCache != null ? this.peerStatusCache.getStatuses() : Collections.emptySet();
    }

    private Set<PeerStatus> getPeerStatuses() {
        if (isPeerRefreshNeeded()) {
            refreshPeerStatusCache();
        }
        return getLastFetchedQueryablePeers();
    }

    private Set<PeerDescription> getPeersToQuery() throws IOException {
        HashSet hashSet = new HashSet();
        Set<PeerStatus> lastFetchedQueryablePeers = getLastFetchedQueryablePeers();
        if (lastFetchedQueryablePeers != null && !lastFetchedQueryablePeers.isEmpty()) {
            Iterator<PeerStatus> it = lastFetchedQueryablePeers.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().getPeerDescription());
            }
        }
        hashSet.add(this.peerStatusProvider.getBootstrapPeerDescription());
        return hashSet;
    }

    private boolean isCacheExpired(PeerStatusCache peerStatusCache) {
        return peerStatusCache == null || peerStatusCache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis();
    }

    private boolean isPeerRefreshNeeded() {
        return this.peerStatusCache == null || this.peerStatusCache.isEmpty() || isCacheExpired(this.peerStatusCache);
    }

    private void persistPeerStatuses(PeerStatusCache peerStatusCache) {
        try {
            this.peerStatusCache = peerStatusCache;
            if (this.peerPersistence != null) {
                this.peerPersistence.save(peerStatusCache);
            }
        } catch (IOException e) {
            EventReportUtil.error(logger, this.eventReporter, "Failed to persist list of peers due to {}; if restarted and the nodes specified at the remote instance are down, may be unable to transfer data until communications with those nodes are restored", e.toString());
            logger.error("", e);
        }
    }

    private void refreshPeerStatusCache() {
        try {
            Set<PeerDescription> peersToQuery = getPeersToQuery();
            Set<PeerStatus> fetchRemotePeerStatuses = fetchRemotePeerStatuses(peersToQuery);
            if (fetchRemotePeerStatuses.isEmpty()) {
                logger.info("No peers were retrieved from the remote group {}", peersToQuery.stream().map(peerDescription -> {
                    return peerDescription.getHostname() + ":" + peerDescription.getPort();
                }).collect(Collectors.joining(",")));
            }
            persistPeerStatuses(new PeerStatusCache(fetchRemotePeerStatuses, System.currentTimeMillis(), this.peerStatusProvider.getRemoteInstanceUris(), this.peerStatusProvider.getTransportProtocol()));
            logger.info("Successfully refreshed peer status cache; remote group consists of {} peers", Integer.valueOf(fetchRemotePeerStatuses.size()));
        } catch (Exception e) {
            EventReportUtil.warn(logger, this.eventReporter, "Unable to refresh remote group peers due to: {}", e.getMessage());
            if (!logger.isDebugEnabled() || e.getCause() == null) {
                return;
            }
            logger.warn("Caused by: ", e);
        }
    }
}
