package org.apache.pulsar.io.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaAbstractSource.class */
public abstract class KafkaAbstractSource<V> extends PushSource<V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaAbstractSource.class);
    private Consumer<String, byte[]> consumer;
    private Properties props;
    private KafkaSourceConfig kafkaSourceConfig;
    Thread runnerThread;

    /* loaded from: input_file:org/apache/pulsar/io/kafka/KafkaAbstractSource$KafkaRecord.class */
    private static class KafkaRecord<V> implements Record<V> {
        private final ConsumerRecord<String, byte[]> record;
        private final V value;
        private final CompletableFuture<Void> completableFuture = new CompletableFuture<>();

        public KafkaRecord(ConsumerRecord<String, byte[]> consumerRecord, V v) {
            this.record = consumerRecord;
            this.value = v;
        }

        @Override // org.apache.pulsar.functions.api.Record
        public Optional<String> getPartitionId() {
            return Optional.of(Integer.toString(this.record.partition()));
        }

        @Override // org.apache.pulsar.functions.api.Record
        public Optional<Long> getRecordSequence() {
            return Optional.of(Long.valueOf(this.record.offset()));
        }

        @Override // org.apache.pulsar.functions.api.Record
        public Optional<String> getKey() {
            return Optional.ofNullable(this.record.key());
        }

        @Override // org.apache.pulsar.functions.api.Record
        public V getValue() {
            return this.value;
        }

        @Override // org.apache.pulsar.functions.api.Record
        public void ack() {
            this.completableFuture.complete(null);
        }

        public CompletableFuture<Void> getCompletableFuture() {
            return this.completableFuture;
        }
    }

    @Override // org.apache.pulsar.io.core.PushSource, org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        this.kafkaSourceConfig = KafkaSourceConfig.load(map);
        if (this.kafkaSourceConfig.getTopic() == null || this.kafkaSourceConfig.getBootstrapServers() == null || this.kafkaSourceConfig.getGroupId() == null || this.kafkaSourceConfig.getFetchMinBytes().longValue() == 0 || this.kafkaSourceConfig.getAutoCommitIntervalMs().longValue() == 0 || this.kafkaSourceConfig.getSessionTimeoutMs().longValue() == 0) {
            throw new IllegalArgumentException("Required property not set.");
        }
        this.props = new Properties();
        this.props.put("bootstrap.servers", this.kafkaSourceConfig.getBootstrapServers());
        this.props.put(ConsumerConfig.GROUP_ID_CONFIG, this.kafkaSourceConfig.getGroupId());
        this.props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.kafkaSourceConfig.getFetchMinBytes().toString());
        this.props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.kafkaSourceConfig.getAutoCommitIntervalMs().toString());
        this.props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.kafkaSourceConfig.getSessionTimeoutMs().toString());
        this.props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        this.props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.kafkaSourceConfig.getKeyDeserializationClass());
        this.props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.kafkaSourceConfig.getValueDeserializationClass());
        start();
    }

    protected Properties beforeCreateConsumer(Properties properties) {
        return properties;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        LOG.info("Stopping kafka source");
        if (this.runnerThread != null) {
            this.runnerThread.interrupt();
            this.runnerThread.join();
            this.runnerThread = null;
        }
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
        LOG.info("Kafka source stopped.");
    }

    public void start() {
        this.runnerThread = new Thread(() -> {
            LOG.info("Starting kafka source");
            this.consumer = new KafkaConsumer(beforeCreateConsumer(this.props));
            this.consumer.subscribe(Arrays.asList(this.kafkaSourceConfig.getTopic()));
            LOG.info("Kafka source started.");
            while (true) {
                ConsumerRecords<String, byte[]> poll = this.consumer.poll(1000L);
                CompletableFuture[] completableFutureArr = new CompletableFuture[poll.count()];
                int i = 0;
                Iterator<ConsumerRecord<String, byte[]>> it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<String, byte[]> next = it.next();
                    LOG.debug("Record received from kafka, key: {}. value: {}", next.key(), next.value());
                    KafkaRecord kafkaRecord = new KafkaRecord(next, extractValue(next));
                    consume(kafkaRecord);
                    completableFutureArr[i] = kafkaRecord.getCompletableFuture();
                    i++;
                }
                if (!this.kafkaSourceConfig.isAutoCommitEnabled()) {
                    try {
                        CompletableFuture.allOf(completableFutureArr).get();
                        this.consumer.commitSync();
                    } catch (InterruptedException | ExecutionException e) {
                        return;
                    }
                }
            }
        });
        this.runnerThread.setName("Kafka Source Thread");
        this.runnerThread.start();
    }

    public abstract V extractValue(ConsumerRecord<String, byte[]> consumerRecord);
}
