/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.impl;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.shade.com.google.common.base.Charsets;
import org.apache.pulsar.shade.com.sun.management.OperatingSystemMXBean;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LinuxBrokerHostUsageImpl
implements BrokerHostUsage {
    private static final Logger log = LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class);
    private long lastCollection = 0L;
    private double lastTotalNicUsageTx;
    private double lastTotalNicUsageRx;
    private double lastCpuUsage;
    private double lastCpuTotalTime;
    private OperatingSystemMXBean systemBean = (OperatingSystemMXBean)ManagementFactory.getOperatingSystemMXBean();
    private SystemResourceUsage usage = new SystemResourceUsage();
    private final Optional<Double> overrideBrokerNicSpeedGbps;
    private final boolean isCGroupsEnabled;
    private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage";
    private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
    private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";

    public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
        this(pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(), pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps(), pulsar.getLoadManagerExecutor());
    }

    public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin, Optional<Double> overrideBrokerNicSpeedGbps, ScheduledExecutorService executorService) {
        this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
        boolean isCGroupsEnabled = false;
        try {
            isCGroupsEnabled = Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH, new String[0]), new LinkOption[0]);
        }
        catch (Exception e) {
            log.warn("Failed to check cgroup CPU usage file: {}", (Object)e.getMessage());
        }
        this.isCGroupsEnabled = isCGroupsEnabled;
        this.calculateBrokerHostUsage();
        executorService.scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::calculateBrokerHostUsage), hostUsageCheckIntervalMin, hostUsageCheckIntervalMin, TimeUnit.MINUTES);
    }

    @Override
    public SystemResourceUsage getBrokerHostUsage() {
        return this.usage;
    }

    @Override
    public void calculateBrokerHostUsage() {
        List<String> nics = this.getNics();
        double totalNicLimit = this.getTotalNicLimitKbps(nics);
        double totalNicUsageTx = this.getTotalNicUsageTxKb(nics);
        double totalNicUsageRx = this.getTotalNicUsageRxKb(nics);
        double totalCpuLimit = this.getTotalCpuLimit();
        long now = System.currentTimeMillis();
        double elapsedSeconds = (double)(now - this.lastCollection) / 1000.0;
        if (elapsedSeconds <= 0.0) {
            log.warn("elapsedSeconds {} is not expected, skip this round of calculateBrokerHostUsage", (Object)elapsedSeconds);
            return;
        }
        SystemResourceUsage usage = new SystemResourceUsage();
        double cpuUsage = this.getTotalCpuUsage(elapsedSeconds);
        if (this.lastCollection == 0L) {
            usage.setMemory(this.getMemUsage());
            usage.setBandwidthIn(new ResourceUsage(0.0, totalNicLimit));
            usage.setBandwidthOut(new ResourceUsage(0.0, totalNicLimit));
        } else {
            double nicUsageTx = (totalNicUsageTx - this.lastTotalNicUsageTx) / elapsedSeconds;
            double nicUsageRx = (totalNicUsageRx - this.lastTotalNicUsageRx) / elapsedSeconds;
            usage.setMemory(this.getMemUsage());
            usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit));
            usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit));
        }
        this.lastTotalNicUsageTx = totalNicUsageTx;
        this.lastTotalNicUsageRx = totalNicUsageRx;
        this.lastCollection = System.currentTimeMillis();
        this.usage = usage;
        usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
    }

    private double getTotalCpuLimit() {
        if (this.isCGroupsEnabled) {
            try {
                long quota = LinuxBrokerHostUsageImpl.readLongFromFile(CGROUPS_CPU_LIMIT_QUOTA_PATH);
                long period = LinuxBrokerHostUsageImpl.readLongFromFile(CGROUPS_CPU_LIMIT_PERIOD_PATH);
                if (quota > 0L) {
                    return 100.0 * (double)quota / (double)period;
                }
            }
            catch (IOException e) {
                log.warn("Failed to read CPU quotas from cgroups", (Throwable)e);
            }
        }
        return 100 * Runtime.getRuntime().availableProcessors();
    }

    private double getTotalCpuUsage(double elapsedTimeSeconds) {
        if (this.isCGroupsEnabled) {
            return this.getTotalCpuUsageForCGroup(elapsedTimeSeconds);
        }
        return this.getTotalCpuUsageForEntireHost();
    }

    /*
     * Loose catch block
     */
    private double getTotalCpuUsageForEntireHost() {
        double d;
        Throwable throwable;
        Stream<String> stream;
        block14: {
            block15: {
                stream = Files.lines(Paths.get("/proc/stat", new String[0]));
                throwable = null;
                String[] words = stream.findFirst().get().split("\\s+");
                long total = Arrays.stream(words).filter(s -> !s.contains("cpu")).mapToLong(Long::parseLong).sum();
                long idle = Long.parseLong(words[4]);
                long usage = total - idle;
                double currentUsage = ((double)usage - this.lastCpuUsage) / ((double)total - this.lastCpuTotalTime) * this.getTotalCpuLimit();
                this.lastCpuUsage = usage;
                this.lastCpuTotalTime = total;
                d = currentUsage;
                if (stream == null) break block14;
                if (throwable == null) break block15;
                try {
                    stream.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                break block14;
            }
            stream.close();
        }
        return d;
        catch (Throwable throwable3) {
            try {
                try {
                    throwable = throwable3;
                    throw throwable3;
                }
                catch (Throwable throwable4) {
                    if (stream != null) {
                        if (throwable != null) {
                            try {
                                stream.close();
                            }
                            catch (Throwable throwable5) {
                                throwable.addSuppressed(throwable5);
                            }
                        } else {
                            stream.close();
                        }
                    }
                    throw throwable4;
                }
            }
            catch (IOException e) {
                log.error("Failed to read CPU usage from /proc/stat", (Throwable)e);
                return -1.0;
            }
        }
    }

    private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
        try {
            long usage = LinuxBrokerHostUsageImpl.readLongFromFile(CGROUPS_CPU_USAGE_PATH);
            double currentUsage = (double)usage - this.lastCpuUsage;
            this.lastCpuUsage = usage;
            return 100.0 * currentUsage / elapsedTimeSeconds / (double)TimeUnit.SECONDS.toNanos(1L);
        }
        catch (IOException e) {
            log.error("Failed to read CPU usage from {}", (Object)CGROUPS_CPU_USAGE_PATH, (Object)e);
            return -1.0;
        }
    }

    private ResourceUsage getMemUsage() {
        double total = (double)this.systemBean.getTotalPhysicalMemorySize() / 1048576.0;
        double free = (double)this.systemBean.getFreePhysicalMemorySize() / 1048576.0;
        return new ResourceUsage(total - free, total);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private List<String> getNics() {
        try (Stream<Path> stream = Files.list(Paths.get("/sys/class/net/", new String[0]));){
            List<String> list = stream.filter(this::isPhysicalNic).map(path -> path.getFileName().toString()).collect(Collectors.toList());
            return list;
        }
        catch (IOException e) {
            log.error("Failed to find NICs", (Throwable)e);
            return Collections.emptyList();
        }
    }

    public int getNicCount() {
        return this.getNics().size();
    }

    private boolean isPhysicalNic(Path path) {
        if (!path.toString().contains("/virtual/")) {
            try {
                Files.readAllBytes(path.resolve("speed"));
                return true;
            }
            catch (Exception e) {
                return false;
            }
        }
        return false;
    }

    private Path getNicSpeedPath(String nic) {
        return Paths.get(String.format("/sys/class/net/%s/speed", nic), new String[0]);
    }

    private double getTotalNicLimitKbps(List<String> nics) {
        return this.overrideBrokerNicSpeedGbps.map(aDouble -> aDouble * (double)nics.size() * 1024.0 * 1024.0).orElseGet(() -> nics.stream().mapToDouble(s -> {
            try {
                return Double.parseDouble(new String(Files.readAllBytes(this.getNicSpeedPath((String)s))));
            }
            catch (IOException e) {
                log.error("Failed to read speed for nic " + s, (Throwable)e);
                return 0.0;
            }
        }).sum() * 1024.0);
    }

    private Path getNicTxPath(String nic) {
        return Paths.get(String.format("/sys/class/net/%s/statistics/tx_bytes", nic), new String[0]);
    }

    private Path getNicRxPath(String nic) {
        return Paths.get(String.format("/sys/class/net/%s/statistics/rx_bytes", nic), new String[0]);
    }

    private double getTotalNicUsageRxKb(List<String> nics) {
        return nics.stream().mapToDouble(s -> {
            try {
                return Double.parseDouble(new String(Files.readAllBytes(this.getNicRxPath((String)s))));
            }
            catch (IOException e) {
                log.error("Failed to read rx_bytes for NIC " + s, (Throwable)e);
                return 0.0;
            }
        }).sum() * 8.0 / 1024.0;
    }

    private double getTotalNicUsageTxKb(List<String> nics) {
        return nics.stream().mapToDouble(s -> {
            try {
                return Double.parseDouble(new String(Files.readAllBytes(this.getNicTxPath((String)s))));
            }
            catch (IOException e) {
                log.error("Failed to read tx_bytes for NIC " + s, (Throwable)e);
                return 0.0;
            }
        }).sum() * 8.0 / 1024.0;
    }

    private static long readLongFromFile(String path) throws IOException {
        return Long.parseLong(new String(Files.readAllBytes(Paths.get(path, new String[0])), Charsets.UTF_8).trim());
    }
}

