package org.apache.pinot.core.realtime.impl.kafka2;

import com.yammer.metrics.core.Meter;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamLevelConsumer.class */
public class KafkaStreamLevelConsumer implements StreamLevelConsumer {
    private StreamMessageDecoder _messageDecoder;
    private Logger INSTANCE_LOGGER;
    private String _clientId;
    private String _tableAndStreamName;
    private StreamConfig _streamConfig;
    private KafkaStreamLevelStreamConfig _kafkaStreamLevelStreamConfig;
    private KafkaConsumer<Bytes, Bytes> consumer;
    private ConsumerRecords<Bytes, Bytes> consumerRecords;
    private Iterator<ConsumerRecord<Bytes, Bytes>> kafkaIterator;
    private ServerMetrics _serverMetrics;
    private Map<Integer, Long> consumerOffsets = new HashMap();
    private long lastLogTime = 0;
    private long lastCount = 0;
    private long currentCount = 0;
    private Meter tableAndStreamRowsConsumed = null;
    private Meter tableRowsConsumed = null;

    public KafkaStreamLevelConsumer(String str, String str2, StreamConfig streamConfig, Schema schema, InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
        this._clientId = str;
        this._streamConfig = streamConfig;
        this._kafkaStreamLevelStreamConfig = new KafkaStreamLevelStreamConfig(streamConfig, str2, instanceZKMetadata);
        this._serverMetrics = serverMetrics;
        this._messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
        this._tableAndStreamName = str2 + "-" + streamConfig.getTopicName();
        this.INSTANCE_LOGGER = LoggerFactory.getLogger(KafkaStreamLevelConsumer.class.getName() + "_" + str2 + "_" + streamConfig.getTopicName());
        this.INSTANCE_LOGGER.info("KafkaStreamLevelConsumer: streamConfig : {}", this._streamConfig);
    }

    public void start() throws Exception {
        this.consumer = KafkaStreamLevelConsumerManager.acquireKafkaConsumerForConfig(this._kafkaStreamLevelStreamConfig);
    }

    private void updateKafkaIterator() {
        this.consumerRecords = this.consumer.poll(Duration.ofMillis(this._streamConfig.getFetchTimeoutMillis()));
        this.kafkaIterator = this.consumerRecords.iterator();
    }

    private void resetOffsets() {
        Iterator<Integer> it = this.consumerOffsets.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.consumer.seek(new TopicPartition(this._streamConfig.getTopicName(), intValue), this.consumerOffsets.get(Integer.valueOf(intValue)).longValue());
        }
    }

    public GenericRow next(GenericRow genericRow) {
        if (this.kafkaIterator == null || !this.kafkaIterator.hasNext()) {
            updateKafkaIterator();
        }
        if (!this.kafkaIterator.hasNext()) {
            return null;
        }
        try {
            ConsumerRecord<Bytes, Bytes> next = this.kafkaIterator.next();
            updateOffsets(next.partition(), next.offset());
            GenericRow decode = this._messageDecoder.decode(((Bytes) next.value()).get(), genericRow);
            this.tableAndStreamRowsConsumed = this._serverMetrics.addMeteredTableValue(this._tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L, this.tableAndStreamRowsConsumed);
            this.tableRowsConsumed = this._serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, this.tableRowsConsumed);
            this.currentCount++;
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastLogTime > 60000 || this.currentCount - this.lastCount >= 100000) {
                if (this.lastCount == 0) {
                    this.INSTANCE_LOGGER.info("Consumed {} events from kafka stream {}", Long.valueOf(this.currentCount), this._streamConfig.getTopicName());
                } else {
                    this.INSTANCE_LOGGER.info("Consumed {} events from kafka stream {} (rate:{}/s)", new Object[]{Long.valueOf(this.currentCount - this.lastCount), this._streamConfig.getTopicName(), Float.valueOf((((float) (this.currentCount - this.lastCount)) * 1000.0f) / ((float) (currentTimeMillis - this.lastLogTime)))});
                }
                this.lastCount = this.currentCount;
                this.lastLogTime = currentTimeMillis;
            }
            return decode;
        } catch (Exception e) {
            this.INSTANCE_LOGGER.warn("Caught exception while consuming events", e);
            this._serverMetrics.addMeteredTableValue(this._tableAndStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
            throw e;
        }
    }

    private void updateOffsets(int i, long j) {
        this.consumerOffsets.put(Integer.valueOf(i), Long.valueOf(j + 1));
    }

    public void commit() {
        this.consumer.commitSync(getOffsetsMap());
        resetOffsets();
        this.consumerOffsets.clear();
        this._serverMetrics.addMeteredTableValue(this._tableAndStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
        this._serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
        HashMap hashMap = new HashMap();
        for (Integer num : this.consumerOffsets.keySet()) {
            hashMap.put(new TopicPartition(this._streamConfig.getTopicName(), num.intValue()), new OffsetAndMetadata(this.consumerOffsets.get(num).longValue()));
        }
        return hashMap;
    }

    public void shutdown() throws Exception {
        if (this.consumer != null) {
            resetOffsets();
            KafkaStreamLevelConsumerManager.releaseKafkaConsumer(this.consumer);
            this.consumer = null;
        }
    }
}
