package com.xiaomi.mone.log.stream.job.extension.kafka;

import com.xiaomi.mone.log.api.enums.MQSourceEnum;
import com.xiaomi.mone.log.stream.job.LogDataTransfer;
import com.xiaomi.mone.log.utils.DateUtils;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/stream/job/extension/kafka/KafkaConsumerRunner.class */
public class KafkaConsumerRunner implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaConsumerRunner.class);
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;
    private final LogDataTransfer handleMessage;

    public KafkaConsumerRunner(KafkaConsumer kafkaConsumer, LogDataTransfer logDataTransfer) {
        this.consumer = kafkaConsumer;
        this.handleMessage = logDataTransfer;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.closed.get()) {
            try {
                try {
                    try {
                        Iterator it = this.consumer.poll(1000L).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            if (StringUtils.equals((CharSequence) consumerRecord.key(), this.handleMessage.getSinkJobConfig().getTag())) {
                                log.debug("Thread:{} Consume partition:{} offset:{},message:{}", Thread.currentThread().getName(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.value());
                                this.handleMessage.handleMessage(MQSourceEnum.KAFKA.getName(), (String) consumerRecord.value(), DateUtils.getTime());
                            }
                        }
                    } catch (Exception e) {
                        log.error("kafka consumer error", (Throwable) e);
                    }
                } catch (Exception e2) {
                    log.error("KafkaConsumerRunner send exception", (Throwable) e2);
                    this.consumer.close();
                    return;
                }
            } finally {
                this.consumer.close();
            }
        }
    }

    public void shutdown() {
        this.closed.set(true);
        this.consumer.close(20L, TimeUnit.SECONDS);
    }
}
