package io.confluent.monitoring.clients.interceptor;

import io.confluent.shaded.com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.IsolationLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/monitoring/clients/interceptor/MonitoringConsumerInterceptor.class */
public class MonitoringConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V>, ClusterResourceListener {
    private static final Logger log = LoggerFactory.getLogger(MonitoringConsumerInterceptor.class);
    private volatile ClusterResource clusterResource;
    private volatile MonitoringInterceptor monitoringInterceptor;
    private Map<String, ?> configs;
    private String groupId = "";
    protected boolean enabled = true;

    public void configure(Map<String, ?> map) {
        this.configs = map;
        Object obj = map.get("group.id");
        if (obj != null && (obj instanceof String)) {
            this.groupId = (String) obj;
        }
        if (map.containsKey("isolation.level")) {
            IsolationLevel valueOf = IsolationLevel.valueOf(map.get("isolation.level").toString().toUpperCase(Locale.ROOT));
            if (!IsolationLevel.READ_UNCOMMITTED.equals(valueOf)) {
                log.error("IsolationLevel={} not supported. monitoring disabled", valueOf);
                this.enabled = false;
            }
        }
        log.debug("configure=finished groupId={}", this.groupId);
    }

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> consumerRecords) {
        if (this.enabled) {
            if (this.monitoringInterceptor == null) {
                synchronized (this) {
                    if (this.monitoringInterceptor == null) {
                        log.info("creating interceptor");
                        Preconditions.checkState(this.clusterResource != null, "clusterResource is not defined");
                        Preconditions.checkState(this.configs != null, "consumer interceptor is not configured");
                        MonitoringInterceptor createForConsumer = MonitoringInterceptor.createForConsumer(this.clusterResource.clusterId(), this.groupId, this.configs);
                        createForConsumer.start();
                        this.monitoringInterceptor = createForConsumer;
                    }
                }
            }
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                this.monitoringInterceptor.recordMessageMetric(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.timestamp(), Math.max(consumerRecord.serializedKeySize(), 0) + Math.max(consumerRecord.serializedValueSize(), 0), consumerRecord.checksum(), System.currentTimeMillis() - consumerRecord.timestamp());
            }
        }
        return consumerRecords;
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void close() {
        log.debug("closing");
        if (this.monitoringInterceptor != null) {
            this.monitoringInterceptor.shutdown();
        }
    }

    public void onUpdate(ClusterResource clusterResource) {
        log.debug("cluster updated");
        this.clusterResource = clusterResource;
    }
}
