package com.xiaomi.mone.log.stream.job.extension.kafka;

import com.google.common.collect.Lists;
import com.xiaomi.mone.log.stream.job.LogDataTransfer;
import com.xiaomi.mone.log.stream.job.extension.SinkJob;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/stream/job/extension/kafka/KafkaSinkJob.class */
public class KafkaSinkJob implements SinkJob {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaSinkJob.class);
    private final KafkaConfig kafkaConfig;
    private final KafkaConsumer<String, String> consumer;
    private final LogDataTransfer dataTransfer;
    private KafkaConsumerRunner kafkaConsumerRunner;

    public KafkaSinkJob(KafkaConfig kafkaConfig, KafkaConsumer<String, String> kafkaConsumer, LogDataTransfer logDataTransfer) {
        this.kafkaConfig = kafkaConfig;
        this.consumer = kafkaConsumer;
        this.dataTransfer = logDataTransfer;
    }

    @Override // com.xiaomi.mone.log.stream.job.extension.SinkJob
    public boolean start() throws Exception {
        try {
            this.consumer.subscribe(Lists.newArrayList(this.kafkaConfig.getTopicName()));
            this.kafkaConsumerRunner = new KafkaConsumerRunner(this.consumer, this.dataTransfer);
            Thread.ofVirtual().start(this.kafkaConsumerRunner);
            return true;
        } catch (Exception e) {
            log.error("start kafka consumer error", (Throwable) e);
            return false;
        }
    }

    @Override // com.xiaomi.mone.log.stream.job.extension.SinkJob
    public void shutdown() throws Exception {
        this.kafkaConsumerRunner.shutdown();
    }
}
