package org.apache.kylin.source.kafka.hadoop;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.SystemPropertyUtils;

/* loaded from: input_file:org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.class */
public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWritable> {
    static Logger log = LoggerFactory.getLogger((Class<?>) KafkaInputRecordReader.class);
    public static final long DEFAULT_KAFKA_CONSUMER_POLL_TIMEOUT = 60000;
    private Configuration conf;
    private KafkaInputSplit split;
    private Consumer consumer;
    private String brokers;
    private String topic;
    private int partition;
    private long earliestOffset;
    private long watermark;
    private long latestOffset;
    private ConsumerRecords<String, String> messages;
    private Iterator<ConsumerRecord<String, String>> iterator;
    private LongWritable key;
    private BytesWritable value;
    private long timeOut = 60000;
    private long numProcessedMessages = 0;

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        initialize(inputSplit, taskAttemptContext.getConfiguration());
    }

    public void initialize(InputSplit inputSplit, Configuration configuration) throws IOException, InterruptedException {
        this.conf = configuration;
        this.split = (KafkaInputSplit) inputSplit;
        this.brokers = this.split.getBrokers();
        this.topic = this.split.getTopic();
        this.partition = this.split.getPartition();
        this.watermark = this.split.getOffsetStart();
        if (configuration.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) {
            this.timeOut = Long.parseLong(configuration.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT));
        }
        this.consumer = KafkaClient.getKafkaConsumer(this.brokers, configuration.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP), KafkaConsumerProperties.extractKafkaConfigToProperties(configuration));
        this.earliestOffset = this.split.getOffsetStart();
        this.latestOffset = this.split.getOffsetEnd();
        this.consumer.assign(Arrays.asList(new TopicPartition(this.topic, this.partition)));
        log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", this.split, this.topic, this.split.getBrokers(), Integer.valueOf(this.partition), Long.valueOf(this.earliestOffset), Long.valueOf(this.latestOffset));
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (this.key == null) {
            this.key = new LongWritable();
        }
        if (this.value == null) {
            this.value = new BytesWritable();
        }
        if (this.watermark >= this.latestOffset) {
            log.info("Reach the end offset, stop reading.");
            return false;
        }
        if (this.messages == null) {
            log.info("{} fetching offset {} ", this.topic + SystemPropertyUtils.VALUE_SEPARATOR + this.split.getBrokers() + SystemPropertyUtils.VALUE_SEPARATOR + this.partition, Long.valueOf(this.watermark));
            this.consumer.seek(new TopicPartition(this.topic, this.partition), this.watermark);
            this.messages = this.consumer.poll(this.timeOut);
            this.iterator = this.messages.iterator();
            if (!this.iterator.hasNext()) {
                log.info("No more messages, stop");
                throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", Long.valueOf(this.latestOffset), Long.valueOf(this.watermark)));
            }
        }
        if (!this.iterator.hasNext()) {
            log.error("Unexpected iterator end.");
            throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", Long.valueOf(this.latestOffset), Long.valueOf(this.watermark)));
        }
        ConsumerRecord<String, String> next = this.iterator.next();
        this.key.set(next.offset());
        byte[] bytes = Bytes.toBytes((String) next.value());
        this.value.set(bytes, 0, bytes.length);
        this.watermark = next.offset() + 1;
        this.numProcessedMessages++;
        if (this.iterator.hasNext()) {
            return true;
        }
        this.messages = null;
        this.iterator = null;
        return true;
    }

    /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
    public LongWritable m873getCurrentKey() throws IOException, InterruptedException {
        return this.key;
    }

    /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
    public BytesWritable m872getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    public float getProgress() throws IOException, InterruptedException {
        if (this.watermark >= this.latestOffset || this.earliestOffset == this.latestOffset) {
            return 1.0f;
        }
        return Math.min(1.0f, ((float) (this.watermark - this.earliestOffset)) / ((float) (this.latestOffset - this.earliestOffset)));
    }

    public void close() throws IOException {
        log.info("{} num. processed messages {} ", this.topic + SystemPropertyUtils.VALUE_SEPARATOR + this.split.getBrokers() + SystemPropertyUtils.VALUE_SEPARATOR + this.partition, Long.valueOf(this.numProcessedMessages));
        this.consumer.close();
    }
}
