package kafka.server.resource;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryNotificationInfo;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.util.Optional;
import javax.management.ListenerNotFoundException;
import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import kafka.server.BrokerReconfigurable;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Set;

/* loaded from: input_file:kafka/server/resource/HeapWatcher.class */
public class HeapWatcher implements BrokerReconfigurable, NotificationFilter, NotificationListener {
    final MetricName almostOOMMetricName;
    private final Metrics metrics;
    private final Sensor almostOOMSensor;
    private final MemoryPoolMXBean memoryPoolMxBean;
    private long usageThresholdBytes;
    private boolean registered;
    private static final Logger LOG = LoggerFactory.getLogger(HeapWatcher.class);
    private static final String METRICS_GROUP_NAME = HeapWatcher.class.getSimpleName();
    public static Set<String> reconfigurableConfigs = JavaConverters.asScalaSet(Utils.mkSet(new String[]{"confluent.heap.tenured.notify.bytes"}));

    public static Optional<HeapWatcher> maybeCreateHeapWatcher(Metrics metrics, KafkaConfig kafkaConfig) {
        if (isEnabledFromConfig(kafkaConfig)) {
            return Optional.of(new HeapWatcher(metrics, kafkaConfig));
        }
        LOG.debug("HeapWatcher disabled in config");
        return Optional.empty();
    }

    protected HeapWatcher(Metrics metrics, KafkaConfig kafkaConfig) throws IllegalArgumentException, ConfigException {
        LOG.trace("Creating HeapWatcher");
        this.registered = false;
        long usageThersholdBytesFromConfig = usageThersholdBytesFromConfig(kafkaConfig);
        this.memoryPoolMxBean = getTenuredPoolMXBean();
        if (this.memoryPoolMxBean == null) {
            LOG.error("Unable to find the MemoryPoolMXBean for the tenured pool, not creating a HeapWatcher");
            throw new IllegalStateException("Unable to find the MemoryPoolMXBean for the tenured pool, not creating a HeapWatcher");
        }
        if (!this.memoryPoolMxBean.isCollectionUsageThresholdSupported()) {
            throw new IllegalArgumentException(String.format("The memory pool supplied ({}) does not support a collection usage threshold", this.memoryPoolMxBean.getName()));
        }
        this.metrics = metrics;
        this.almostOOMSensor = this.metrics.sensor("AlmostOOM");
        this.almostOOMMetricName = metrics.metricName("almost-oom", METRICS_GROUP_NAME, "Number of times an almost-OOM has occurred");
        this.almostOOMSensor.add(this.almostOOMMetricName, new CumulativeCount());
        registerWithMemoryMXBean();
        setCollectionUsageThreshold(usageThersholdBytesFromConfig);
    }

    private synchronized void registerWithMemoryMXBean() {
        NotificationEmitter memoryMXBean = ManagementFactory.getMemoryMXBean();
        LOG.debug("Added HeapWatcher as a listener for the tenured heap MemoryMXBean");
        memoryMXBean.addNotificationListener(this, this, (Object) null);
        this.registered = true;
    }

    synchronized void unregisterWithMemoryMXBean() {
        if (!this.registered) {
            LOG.error("unregisterWithMemoryMXBean() called but HeapWatcher wasn't registered with the MemoryMXBean");
            return;
        }
        try {
            ManagementFactory.getMemoryMXBean().removeNotificationListener(this, this, (Object) null);
            LOG.debug("Removed HeapWatcher as a listener for the tenured heap MemoryMXBean");
            this.registered = false;
        } catch (ListenerNotFoundException e) {
            LOG.warn("removeNotificationListener from MemoryMXBean failed. Maybe it was never registered?", e);
        }
    }

    private synchronized void setCollectionUsageThreshold(long j) {
        if (!this.registered) {
            LOG.error("Trying to setCollectionUsageThreshold before registering with the MemoryMXBean. Ignoring");
        }
        long max = this.memoryPoolMxBean.getUsage().getMax();
        if (max > -1 && j > max) {
            LOG.info("Asked to set collection usage threshold to {}, more than the max available to the pool {}. Disabling notifications insetad", Long.valueOf(j), Long.valueOf(max));
            j = 0;
        }
        LOG.info("Setting collection usage threshold to {}", Long.valueOf(j));
        this.memoryPoolMxBean.setCollectionUsageThreshold(j);
        this.usageThresholdBytes = this.memoryPoolMxBean.getCollectionUsageThreshold();
        if (this.usageThresholdBytes != j) {
            LOG.error("Asked the MemoryPoolMXBean to set collection threshold to {}, but it was set to {} instead", Long.valueOf(this.usageThresholdBytes), Long.valueOf(j));
        }
    }

    private static long usageThersholdBytesFromConfig(KafkaConfig kafkaConfig) throws ConfigException {
        Long valueOf = Long.valueOf(kafkaConfig.confluentConfig().heapWatcherTenuredNotifyBytes());
        if (valueOf == null) {
            return 0L;
        }
        if (valueOf.longValue() < 0) {
            throw new ConfigException(String.format("Value for {} is {}, a negative number", "confluent.heap.tenured.notify.bytes", valueOf));
        }
        return valueOf.longValue();
    }

    private static boolean isEnabledFromConfig(KafkaConfig kafkaConfig) throws ConfigException {
        Boolean valueOf = Boolean.valueOf(kafkaConfig.confluentConfig().heapWatcherTenuredEnabled());
        if (valueOf == null) {
            return false;
        }
        return valueOf.booleanValue();
    }

    private static boolean isTenuredPoolName(String str) {
        return str.contains("Old") || str.contains("Tenured");
    }

    static MemoryPoolMXBean getTenuredPoolMXBean() {
        for (MemoryPoolMXBean memoryPoolMXBean : ManagementFactory.getMemoryPoolMXBeans()) {
            if (memoryPoolMXBean.getType().equals(MemoryType.HEAP) && isTenuredPoolName(memoryPoolMXBean.getName())) {
                return memoryPoolMXBean;
            }
        }
        return null;
    }

    private void notifyListeners() {
        this.almostOOMSensor.record();
    }

    public boolean isNotificationEnabled(Notification notification) {
        if (!notification.getType().equals("java.management.memory.collection.threshold.exceeded")) {
            LOG.trace("Asked about notification that isn't MEMORY_COLLECTION_THRESHOLD_EXCEEDED: {}", notification);
            return false;
        }
        MemoryNotificationInfo from = MemoryNotificationInfo.from((CompositeData) notification.getUserData());
        LOG.debug("Filtering MEMORY_COLLECTION_THRESHOLD_EXCEEDED notification with payload: {}", from);
        return isTenuredPoolName(from.getPoolName());
    }

    public void handleNotification(Notification notification, Object obj) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling notification with payload: {}", MemoryNotificationInfo.from((CompositeData) notification.getUserData()));
        }
        long used = this.memoryPoolMxBean.getCollectionUsage().getUsed();
        LOG.info("Got notified about the tenured pool, collection usage memory being {} (notification threshold is {})", Long.valueOf(used), Long.valueOf(this.usageThresholdBytes));
        if (used > this.usageThresholdBytes) {
            notifyListeners();
        }
    }

    public synchronized void shutdown() {
        unregisterWithMemoryMXBean();
        this.metrics.removeSensor(this.almostOOMSensor.name());
    }

    @Override // kafka.server.BrokerReconfigurable
    /* renamed from: reconfigurableConfigs */
    public Set<String> mo1053reconfigurableConfigs() {
        return reconfigurableConfigs;
    }

    @Override // kafka.server.BrokerReconfigurable
    public void validateReconfiguration(KafkaConfig kafkaConfig) {
        usageThersholdBytesFromConfig(kafkaConfig);
    }

    @Override // kafka.server.BrokerReconfigurable
    public void reconfigure(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        Long valueOf = Long.valueOf(usageThersholdBytesFromConfig(kafkaConfig2));
        if (valueOf.longValue() != this.usageThresholdBytes) {
            setCollectionUsageThreshold(valueOf.longValue());
        }
    }
}
