package org.apache.pinot.plugin.stream.kafka20;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.class */
public class KafkaStreamLevelConsumer implements StreamLevelConsumer {
    private StreamMessageDecoder _messageDecoder;
    private Logger _instanceLogger;
    private String _clientId;
    private String _tableAndStreamName;
    private StreamConfig _streamConfig;
    private KafkaStreamLevelStreamConfig _kafkaStreamLevelStreamConfig;
    private KafkaConsumer<Bytes, Bytes> _consumer;
    private ConsumerRecords<Bytes, Bytes> _consumerRecords;
    private Iterator<ConsumerRecord<Bytes, Bytes>> _kafkaIterator;
    private Map<Integer, Long> _consumerOffsets = new HashMap();
    private long _lastLogTime = 0;
    private long _lastCount = 0;
    private long _currentCount = 0;

    public KafkaStreamLevelConsumer(String str, String str2, StreamConfig streamConfig, Set<String> set, String str3) {
        this._clientId = str;
        this._streamConfig = streamConfig;
        this._kafkaStreamLevelStreamConfig = new KafkaStreamLevelStreamConfig(streamConfig, str2, str3);
        this._messageDecoder = StreamDecoderProvider.create(streamConfig, set);
        this._tableAndStreamName = str2 + "-" + streamConfig.getTopicName();
        this._instanceLogger = LoggerFactory.getLogger(KafkaStreamLevelConsumer.class.getName() + "_" + str2 + "_" + streamConfig.getTopicName());
        this._instanceLogger.info("KafkaStreamLevelConsumer: streamConfig : {}", this._streamConfig);
    }

    public void start() throws Exception {
        this._consumer = KafkaStreamLevelConsumerManager.acquireKafkaConsumerForConfig(this._kafkaStreamLevelStreamConfig);
    }

    private void updateKafkaIterator() {
        this._consumerRecords = this._consumer.poll(Duration.ofMillis(this._streamConfig.getFetchTimeoutMillis()));
        this._kafkaIterator = this._consumerRecords.iterator();
    }

    private void resetOffsets() {
        Iterator<Integer> it = this._consumerOffsets.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this._consumer.seek(new TopicPartition(this._streamConfig.getTopicName(), intValue), this._consumerOffsets.get(Integer.valueOf(intValue)).longValue());
        }
    }

    public GenericRow next(GenericRow genericRow) {
        if (this._kafkaIterator == null || !this._kafkaIterator.hasNext()) {
            updateKafkaIterator();
        }
        if (!this._kafkaIterator.hasNext()) {
            return null;
        }
        try {
            ConsumerRecord<Bytes, Bytes> next = this._kafkaIterator.next();
            updateOffsets(next.partition(), next.offset());
            GenericRow decode = this._messageDecoder.decode(next.value().get(), genericRow);
            this._currentCount++;
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this._lastLogTime > 60000 || this._currentCount - this._lastCount >= 100000) {
                if (this._lastCount == 0) {
                    this._instanceLogger.info("Consumed {} events from kafka stream {}", Long.valueOf(this._currentCount), this._streamConfig.getTopicName());
                } else {
                    this._instanceLogger.info("Consumed {} events from kafka stream {} (rate:{}/s)", Long.valueOf(this._currentCount - this._lastCount), this._streamConfig.getTopicName(), Float.valueOf((((float) (this._currentCount - this._lastCount)) * 1000.0f) / ((float) (currentTimeMillis - this._lastLogTime))));
                }
                this._lastCount = this._currentCount;
                this._lastLogTime = currentTimeMillis;
            }
            return decode;
        } catch (Exception e) {
            this._instanceLogger.warn("Caught exception while consuming events", (Throwable) e);
            throw e;
        }
    }

    private void updateOffsets(int i, long j) {
        this._consumerOffsets.put(Integer.valueOf(i), Long.valueOf(j + 1));
    }

    public void commit() {
        this._consumer.commitSync(getOffsetsMap());
        resetOffsets();
        this._consumerOffsets.clear();
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
        HashMap hashMap = new HashMap();
        for (Integer num : this._consumerOffsets.keySet()) {
            hashMap.put(new TopicPartition(this._streamConfig.getTopicName(), num.intValue()), new OffsetAndMetadata(this._consumerOffsets.get(num).longValue()));
        }
        return hashMap;
    }

    public void shutdown() throws Exception {
        if (this._consumer != null) {
            resetOffsets();
            KafkaStreamLevelConsumerManager.releaseKafkaConsumer(this._consumer);
            this._consumer = null;
        }
    }
}
