package org.apache.inlong.audit.service.consume;

import com.google.common.base.Preconditions;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.config.MessageQueueConfig;
import org.apache.inlong.audit.config.StoreConfig;
import org.apache.inlong.audit.service.InsertData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/consume/KafkaConsume.class */
public class KafkaConsume extends BaseConsume {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsume.class);
    private KafkaConsumer<String, byte[]> consumer;
    private String serverUrl;
    private String topic;

    /* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/consume/KafkaConsume$Fetcher.class */
    public class Fetcher implements Runnable {
        private final KafkaConsumer<String, byte[]> consumer;
        private final String topic;
        private final boolean isAutoCommit;
        private final long fetchWaitMs;

        public Fetcher(KafkaConsumer<String, byte[]> kafkaConsumer, String str, boolean z, long j) {
            this.consumer = kafkaConsumer;
            this.topic = str;
            this.isAutoCommit = z;
            this.fetchWaitMs = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.fetchWaitMs));
                    if (poll != null && !poll.isEmpty()) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            if (StringUtils.equals(consumerRecord.topic(), this.topic)) {
                                KafkaConsume.this.handleMessage(new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8));
                            }
                        }
                        if (!this.isAutoCommit) {
                            this.consumer.commitAsync();
                        }
                    }
                } catch (Exception e) {
                    KafkaConsume.LOG.error("kafka consumer get message error {}", e.getMessage());
                }
            }
        }
    }

    public KafkaConsume(List<InsertData> list, StoreConfig storeConfig, MessageQueueConfig messageQueueConfig) {
        super(list, storeConfig, messageQueueConfig);
    }

    @Override // org.apache.inlong.audit.service.consume.BaseConsume
    public void start() {
        this.serverUrl = this.mqConfig.getKafkaServerUrl();
        this.topic = this.mqConfig.getKafkaTopic();
        boolean z = Boolean.getBoolean(this.mqConfig.getEnableAutoCommit());
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.serverUrl), "no kafka server url specified");
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.mqConfig.getKafkaTopic()), "no kafka topic topic specified");
        Preconditions.checkArgument(StringUtils.isNotEmpty(this.mqConfig.getKafkaConsumerName()), "no kafka consume name specified");
        initConsumer(this.mqConfig);
        new Thread(new Fetcher(this.consumer, this.topic, z, this.mqConfig.getFetchWaitMs()), "KafkaConsume_Fetcher_Thread").start();
    }

    protected void initConsumer(MessageQueueConfig messageQueueConfig) {
        LOG.info("init kafka consumer, topic:{}, serverUrl:{}", this.topic, this.serverUrl);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.serverUrl);
        properties.put("group.id", messageQueueConfig.getKafkaGroupId());
        properties.put("enable.auto.commit", messageQueueConfig.getEnableAutoCommit());
        properties.put("auto.commit.interval.ms", messageQueueConfig.getAutoCommitIntervalMs());
        properties.put("auto.offset.reset", messageQueueConfig.getAutoOffsetReset());
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(Collections.singleton(this.topic));
    }
}
