package kafka.server.resource;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import kafka.metrics.LinuxDiskMetricsCollector;
import kafka.server.BrokerReconfigurable;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.KafkaThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Set;

/* loaded from: input_file:kafka/server/resource/DiskIOManager.class */
public class DiskIOManager implements ResourceUsageManager, Runnable, BrokerReconfigurable {
    private static final long CHECK_INTERVAL_MS = 5000;
    public static final long INVALID_THROUGHPUT_THRESHOLD = 1000000000;
    private static final int HEADROOM_MULTIPLIER_TO_EXIT_THROTTLED_MODE = 3;
    private volatile long limit;
    private volatile long headroom;
    private volatile long diskThroughputQuotaForTierArchive;
    private volatile long diskThroughputQuotaForTierArchiveThrottled;
    private final LinuxDiskMetricsCollector linuxDiskMetricsCollector;
    private final int sectorSizeInBytes;
    private final String deviceName;
    private final Metrics metrics;
    private static final Logger LOGGER = LoggerFactory.getLogger(DiskIOManager.class);
    public static final MetricName DISK_THROUGHPUT_BYTE_PER_SEC = new MetricName("DiskThroughputBytePerSec", "DiskIOManager", "Disk read/write throughput (bytes/sec)", new HashMap());
    public static final MetricName DISK_THROUGHPUT_LIMIT_BYTE_PER_SEC = new MetricName("DiskThroughputLimitBytePerSec", "DiskIOManager", "Disk read/write throughput limit (bytes/sec)", new HashMap());
    public static Set<String> reconfigurableConfigs = JavaConverters.asScalaSet(new HashSet(Arrays.asList(KafkaConfig.DiskThroughputLimitBytePerSecProp(), KafkaConfig.DiskThroughputHeadroomBytePerSecProp(), KafkaConfig.DiskThroughputQuotaForTierArchiveBytePerSecProp(), KafkaConfig.DiskThroughputThrottledQuotaForTierArchiveBytePerSecProp())));
    private volatile double rate = 0.0d;
    private volatile boolean isThrottled = false;
    private final Thread thread = new KafkaThread("DiskIOManagerThread", this, false);
    private volatile boolean shutdown = false;
    private final List<ResourceUsageListener> listeners = new LinkedList();

    DiskIOManager(Metrics metrics, KafkaConfig kafkaConfig, LinuxDiskMetricsCollector linuxDiskMetricsCollector, int i, String str) {
        this.metrics = metrics;
        this.limit = kafkaConfig.confluentConfig().diskThroughputLimitBytePerSec().longValue();
        this.headroom = kafkaConfig.confluentConfig().diskThroughputHeadroomBytePerSec().longValue();
        this.diskThroughputQuotaForTierArchive = kafkaConfig.confluentConfig().diskThroughputQuotaTierArchiveBytePerSec().longValue();
        this.diskThroughputQuotaForTierArchiveThrottled = kafkaConfig.confluentConfig().diskThroughputThrottledQuotaTierArchiveBytePerSec().longValue();
        this.linuxDiskMetricsCollector = linuxDiskMetricsCollector;
        this.sectorSizeInBytes = i;
        this.deviceName = str;
        LOGGER.info("DiskIOManager initialized successfully - disk throughput limit={} bytes/sec, headroom={} bytes/sec, sectorSize={} bytes, deviceName={}", new Object[]{Long.valueOf(this.limit), Long.valueOf(this.headroom), Integer.valueOf(i), str});
    }

    public static Optional<DiskIOManager> maybeInitDiskIOManager(Metrics metrics, KafkaConfig kafkaConfig, Optional<LinuxDiskMetricsCollector> optional) {
        if (!kafkaConfig.confluentConfig().diskIOManagerEnable().booleanValue()) {
            LOGGER.info("Skip DiskIOManager init: {} = false", KafkaConfig.DiskIOManagerEnableProp());
            return Optional.empty();
        }
        try {
            validateConfig(kafkaConfig);
            if (!optional.isPresent()) {
                LOGGER.warn("Skip DiskIOManager init: fail to get LinuxDiskMetricsCollector instance");
                return Optional.empty();
            }
            if (optional.get().devices().size() != 1) {
                LOGGER.warn("Skip DiskIOManager init: DiskIOManager currently doesn't support multiple disks");
                return Optional.empty();
            }
            String str = (String) ((Tuple2) optional.get().devices().head())._1();
            int readSectorSize = readSectorSize(str);
            if (readSectorSize >= 0) {
                return Optional.of(new DiskIOManager(metrics, kafkaConfig, optional.get(), readSectorSize, str));
            }
            LOGGER.warn("Skip DiskIOManager init: failed to get sector size from hw_sector_size");
            return Optional.empty();
        } catch (ConfigException e) {
            LOGGER.error("Skip DiskIOManager init: invalid configuration", e);
            return Optional.empty();
        }
    }

    private static int readSectorSize(String str) {
        Path path = Paths.get("/sys", "block", str, "queue", "hw_sector_size");
        try {
            if (!path.toFile().exists()) {
                return -1;
            }
            List<String> readAllLines = Files.readAllLines(path);
            if (readAllLines.size() != 1) {
                return -1;
            }
            return Integer.parseInt(readAllLines.get(0).trim());
        } catch (Exception e) {
            LOGGER.error("Failed to get sector size from {}", path);
            return -1;
        }
    }

    public void startup() {
        if (this.shutdown) {
            return;
        }
        this.thread.start();
        setupMetrics();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.shutdown) {
            try {
                updateUsage();
                applyQuota();
                Thread.sleep(CHECK_INTERVAL_MS);
            } catch (Exception e) {
                if (this.shutdown) {
                    return;
                }
                LOGGER.error("Fatal exception in DiskIOManager", e);
                return;
            }
        }
    }

    public void shutdown() {
        this.shutdown = true;
        try {
            this.thread.join();
        } catch (InterruptedException e) {
            LOGGER.error("DiskIOManager shutdown interrupted", e);
        }
        removeMetrics();
    }

    @Override // kafka.server.resource.ResourceUsageManager
    public void updateUsage() {
        double kafka$metrics$LinuxDiskMetricsCollector$$$anonfun$maybeRegisterMetrics$4 = this.linuxDiskMetricsCollector.kafka$metrics$LinuxDiskMetricsCollector$$$anonfun$maybeRegisterMetrics$4(this.deviceName, 2);
        double kafka$metrics$LinuxDiskMetricsCollector$$$anonfun$maybeRegisterMetrics$42 = this.linuxDiskMetricsCollector.kafka$metrics$LinuxDiskMetricsCollector$$$anonfun$maybeRegisterMetrics$4(this.deviceName, 6);
        if (kafka$metrics$LinuxDiskMetricsCollector$$$anonfun$maybeRegisterMetrics$4 < 0.0d || kafka$metrics$LinuxDiskMetricsCollector$$$anonfun$maybeRegisterMetrics$42 < 0.0d) {
            return;
        }
        this.rate = (kafka$metrics$LinuxDiskMetricsCollector$$$anonfun$maybeRegisterMetrics$42 + kafka$metrics$LinuxDiskMetricsCollector$$$anonfun$maybeRegisterMetrics$4) * this.sectorSizeInBytes;
    }

    @Override // kafka.server.resource.ResourceUsageManager
    public void applyQuota() {
        long j = this.limit;
        long j2 = this.headroom;
        double d = this.rate;
        if (this.isThrottled) {
            if (d < j - (j2 * 3)) {
                LOGGER.info("DiskIOManager exiting throttled mode");
                applyQuota(false);
                this.isThrottled = false;
                return;
            }
            return;
        }
        if (d > j - j2) {
            LOGGER.info("DiskIOManager entering throttled mode");
            applyQuota(true);
            this.isThrottled = true;
        }
    }

    private void applyQuota(boolean z) {
        for (ResourceUsageListener resourceUsageListener : this.listeners) {
            if (resourceUsageListener instanceof TierArchiverDiskThroughputListener) {
                resourceUsageListener.setQuota(z ? this.diskThroughputQuotaForTierArchiveThrottled : this.diskThroughputQuotaForTierArchive);
            }
        }
    }

    @Override // kafka.server.resource.ResourceUsageManager
    public void registerListener(ResourceUsageListener resourceUsageListener) {
        this.listeners.add(resourceUsageListener);
    }

    public boolean isThrottled() {
        return this.isThrottled;
    }

    private void setupMetrics() {
        this.metrics.addMetric(DISK_THROUGHPUT_BYTE_PER_SEC, (metricConfig, j) -> {
            return this.rate;
        });
        this.metrics.addMetric(DISK_THROUGHPUT_LIMIT_BYTE_PER_SEC, (metricConfig2, j2) -> {
            long j2 = this.limit;
            if (j2 >= INVALID_THROUGHPUT_THRESHOLD) {
                return 0.0d;
            }
            return j2;
        });
    }

    private void removeMetrics() {
        this.metrics.removeMetric(DISK_THROUGHPUT_BYTE_PER_SEC);
        this.metrics.removeMetric(DISK_THROUGHPUT_LIMIT_BYTE_PER_SEC);
    }

    @Override // kafka.server.BrokerReconfigurable
    /* renamed from: reconfigurableConfigs */
    public Set<String> mo1053reconfigurableConfigs() {
        return reconfigurableConfigs;
    }

    @Override // kafka.server.BrokerReconfigurable
    public void validateReconfiguration(KafkaConfig kafkaConfig) {
        validateConfig(kafkaConfig);
    }

    private static void validateConfig(KafkaConfig kafkaConfig) {
        long longValue = kafkaConfig.confluentConfig().diskThroughputLimitBytePerSec().longValue();
        long longValue2 = kafkaConfig.confluentConfig().diskThroughputHeadroomBytePerSec().longValue();
        long longValue3 = kafkaConfig.confluentConfig().diskThroughputQuotaTierArchiveBytePerSec().longValue();
        long longValue4 = kafkaConfig.confluentConfig().diskThroughputThrottledQuotaTierArchiveBytePerSec().longValue();
        if (longValue < 0 || longValue2 < 0 || longValue2 * 3 >= longValue || longValue4 < 0 || longValue3 < 0) {
            throw new ConfigException(String.format("Invalid DiskIOManager configuration, limit: %d, headroom: %d, tier archiver quota: %d (non-throttled mode), %d (throttled mode)", Long.valueOf(longValue), Long.valueOf(longValue2), Long.valueOf(longValue3), Long.valueOf(longValue4)));
        }
    }

    @Override // kafka.server.BrokerReconfigurable
    public void reconfigure(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        this.limit = kafkaConfig2.confluentConfig().diskThroughputLimitBytePerSec().longValue();
        this.headroom = kafkaConfig2.confluentConfig().diskThroughputHeadroomBytePerSec().longValue();
        this.diskThroughputQuotaForTierArchive = kafkaConfig2.confluentConfig().diskThroughputQuotaTierArchiveBytePerSec().longValue();
        this.diskThroughputQuotaForTierArchiveThrottled = kafkaConfig2.confluentConfig().diskThroughputThrottledQuotaTierArchiveBytePerSec().longValue();
    }
}
