package org.apache.hugegraph.loader.reader.kafka;

import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Properties;
import java.util.Queue;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.exception.InitException;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.loader.parser.CsvLineParser;
import org.apache.hugegraph.loader.parser.JsonLineParser;
import org.apache.hugegraph.loader.parser.LineParser;
import org.apache.hugegraph.loader.parser.TextLineParser;
import org.apache.hugegraph.loader.reader.AbstractReader;
import org.apache.hugegraph.loader.reader.line.Line;
import org.apache.hugegraph.loader.source.file.FileFormat;
import org.apache.hugegraph.loader.source.kafka.KafkaSource;
import org.apache.hugegraph.util.Log;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/loader/reader/kafka/KafkaReader.class */
public class KafkaReader extends AbstractReader {
    private static final Logger LOG = Log.logger(KafkaReader.class);
    private final KafkaSource source;
    private Queue<String> batch;
    private static final String BASE_CONSUMER_GROUP = "kafka-reader-base";
    private final boolean earlyStop;
    private boolean emptyPoll;
    private final KafkaConsumer dataConsumer = createKafkaConsumer();
    private final LineParser parser = createLineParser();

    public KafkaReader(KafkaSource kafkaSource) {
        this.source = kafkaSource;
        this.earlyStop = kafkaSource.isEarlyStop();
    }

    @Override // org.apache.hugegraph.loader.reader.InputReader
    public void init(LoadContext loadContext, InputStruct inputStruct) throws InitException {
        progress(loadContext, inputStruct);
    }

    @Override // org.apache.hugegraph.loader.reader.InputReader
    public void confirmOffset() {
    }

    @Override // org.apache.hugegraph.loader.reader.InputReader, java.lang.AutoCloseable
    public void close() {
        this.dataConsumer.close();
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        return (this.earlyStop && this.emptyPoll) ? false : true;
    }

    @Override // java.util.Iterator
    public Line next() {
        if (this.batch == null || this.batch.size() == 0) {
            this.batch = nextBatch();
        }
        String poll = this.batch.poll();
        if (poll != null) {
            return this.parser.parse(this.source.header(), poll);
        }
        this.emptyPoll = true;
        return null;
    }

    private int getKafkaTopicPartitionCount() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.source.getBootstrapServer());
        properties.put("group.id", BASE_CONSUMER_GROUP);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        int size = kafkaConsumer.partitionsFor(this.source.getTopic()).size();
        kafkaConsumer.close();
        return size;
    }

    private KafkaConsumer<String, String> createKafkaConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.source.getBootstrapServer());
        properties.put("max.poll.records", Integer.valueOf(this.source.getBatchSize()));
        properties.put("group.id", this.source.getGroup());
        properties.put("enable.auto.commit", Constants.KAFKA_AUTO_COMMIT);
        properties.put("auto.commit.interval.ms", String.valueOf(1000L));
        properties.put("session.timeout.ms", String.valueOf(Constants.KAFKA_SESSION_TIMEOUT));
        if (this.source.isFromBeginning()) {
            properties.put("auto.offset.reset", Constants.KAFKA_EARLIEST_OFFSET);
        } else {
            properties.put("auto.offset.reset", Constants.KAFKA_LATEST_OFFSET);
        }
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(ImmutableList.of(this.source.getTopic()));
        return kafkaConsumer;
    }

    private Deque<String> nextBatch() {
        ConsumerRecords poll = this.dataConsumer.poll(Duration.ofMillis(1000L));
        ArrayDeque arrayDeque = new ArrayDeque(poll.count());
        if (poll.count() == 0) {
            Thread.sleep(1000L);
        } else {
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                arrayDeque.add(((ConsumerRecord) it.next()).value());
            }
        }
        return arrayDeque;
    }

    private LineParser createLineParser() {
        FileFormat format = this.source.getFormat();
        switch (format) {
            case CSV:
                return new CsvLineParser();
            case TEXT:
                return new TextLineParser(this.source.getDelimiter());
            case JSON:
                return new JsonLineParser();
            default:
                throw new AssertionError(String.format("Unsupported file format '%s' of source '%s'", format, this.source));
        }
    }
}
