/*
 * Decompiled with CFR 0.152.
 */
package jmind.core.kafka;

import java.util.Properties;
import jmind.base.lang.SourceProperties;
import jmind.base.util.CollectionsUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKafkaConsumer {
    private boolean started = true;
    private String topicName;
    private String groupName;
    protected KafkaConsumer<String, String> consumer;
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public AbstractKafkaConsumer(String topicName, String groupName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", SourceProperties.getDataSource().getProperty("bootstrap.servers"));
        props.put("group.id", groupName);
        props.put("enable.auto.commit", (Object)false);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer(props);
        this.topicName = topicName;
        this.groupName = groupName;
        this.listen();
    }

    private void listen() {
        this.consumer.subscribe(CollectionsUtil.asList((String)this.topicName, (String)","));
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                while (AbstractKafkaConsumer.this.isStarted()) {
                    ConsumerRecords records = AbstractKafkaConsumer.this.consumer.poll(10000L);
                    try {
                        AbstractKafkaConsumer.this.handleMessage((ConsumerRecords<String, String>)records);
                        AbstractKafkaConsumer.this.consumer.commitAsync();
                    }
                    catch (Exception e) {
                        AbstractKafkaConsumer.this.logger.error("consumer error, topic={},groupName={}", new Object[]{AbstractKafkaConsumer.this.topicName, AbstractKafkaConsumer.this.groupName, e});
                    }
                }
            }
        };
        Thread thread = new Thread(runnable);
        thread.setDaemon(true);
        thread.setName(this.groupName + "-" + this.groupName + thread.getName());
        thread.start();
        this.logger.info("topic=" + this.topicName + ", groupName=" + this.groupName + " started");
    }

    public abstract void handleMessage(ConsumerRecords<String, String> var1) throws Exception;

    public String getTopicName() {
        return this.topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }

    public String getGroupName() {
        return this.groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public boolean isStarted() {
        return this.started;
    }

    public void setStarted(boolean started) {
        this.started = started;
    }
}

