/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.connect.kafka;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.pulsar.connect.core.PushSource;
import org.apache.pulsar.connect.core.Record;
import org.apache.pulsar.connect.kafka.KafkaSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSource<V>
implements PushSource<V> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    private Consumer<String, V> consumer;
    private Properties props;
    private KafkaSourceConfig kafkaSourceConfig;
    Thread runnerThread;
    private Function<Record<V>, CompletableFuture<Void>> consumeFunction;

    public void open(Map<String, Object> config) throws Exception {
        this.kafkaSourceConfig = KafkaSourceConfig.load(config);
        if (this.kafkaSourceConfig.getTopic() == null || this.kafkaSourceConfig.getBootstrapServers() == null || this.kafkaSourceConfig.getGroupId() == null || this.kafkaSourceConfig.getFetchMinBytes() == 0L || this.kafkaSourceConfig.getAutoCommitIntervalMs() == 0L || this.kafkaSourceConfig.getSessionTimeoutMs() == 0L) {
            throw new IllegalArgumentException("Required property not set.");
        }
        this.props = new Properties();
        this.props.put("bootstrap.servers", this.kafkaSourceConfig.getBootstrapServers());
        this.props.put("group.id", this.kafkaSourceConfig.getGroupId());
        this.props.put("fetch.min.bytes", this.kafkaSourceConfig.getFetchMinBytes().toString());
        this.props.put("auto.commit.interval.ms", this.kafkaSourceConfig.getAutoCommitIntervalMs().toString());
        this.props.put("session.timeout.ms", this.kafkaSourceConfig.getSessionTimeoutMs().toString());
        this.props.put("key.deserializer", this.kafkaSourceConfig.getKeyDeserializationClass());
        this.props.put("value.deserializer", this.kafkaSourceConfig.getValueDeserializationClass());
        this.start();
    }

    public void close() throws InterruptedException {
        LOG.info("Stopping kafka source");
        if (this.runnerThread != null) {
            this.runnerThread.interrupt();
            this.runnerThread.join();
            this.runnerThread = null;
        }
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
        LOG.info("Kafka source stopped.");
    }

    public void start() {
        this.runnerThread = new Thread(() -> {
            LOG.info("Starting kafka source");
            this.consumer = new KafkaConsumer(this.props);
            this.consumer.subscribe(Arrays.asList(this.kafkaSourceConfig.getTopic()));
            LOG.info("Kafka source started.");
            while (true) {
                ConsumerRecords records = this.consumer.poll(1000L);
                CompletableFuture[] futures = new CompletableFuture[records.count()];
                int index = 0;
                for (ConsumerRecord record : records) {
                    LOG.debug("Record received from kafka, key: {}. value: {}", record.key(), record.value());
                    futures[index] = this.consumeFunction.apply(new KafkaRecord(record));
                    ++index;
                }
                if (this.kafkaSourceConfig.isAutoCommitEnabled()) continue;
                try {
                    CompletableFuture.allOf(futures).get();
                    this.consumer.commitSync();
                }
                catch (InterruptedException | ExecutionException ex) {
                    return;
                }
            }
        });
        this.runnerThread.setName("Kafka Source Thread");
        this.runnerThread.start();
    }

    public void setConsumer(Function<Record<V>, CompletableFuture<Void>> consumeFunction) {
        this.consumeFunction = consumeFunction;
    }

    private static class KafkaRecord<V>
    implements Record<V> {
        private final ConsumerRecord<String, V> record;

        public KafkaRecord(ConsumerRecord<String, V> record) {
            this.record = record;
        }

        public String getPartitionId() {
            return Integer.toString(this.record.partition());
        }

        public long getRecordSequence() {
            return this.record.offset();
        }

        public V getValue() {
            return (V)this.record.value();
        }
    }
}

