package org.apache.inlong.agent.plugin.sources;

import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.reader.KafkaReader;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/KafkaSource.class */
public class KafkaSource extends org.apache.inlong.agent.plugin.sources.file.AbstractSource {
    public static final String JOB_KAFKA_AUTO_RESETE = "auto.offset.reset";
    private static final String JOB_KAFKAJOB_PARAM_PREFIX = "job.kafkaJob.";
    private static final String JOB_KAFKAJOB_WAIT_TIMEOUT = "job.kafkajob.wait.timeout";
    private static final String KAFKA_COMMIT_AUTO = "enable.auto.commit";
    private static final String KAFKA_DESERIALIZER_METHOD = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
    private static final String KAFKA_KEY_DESERIALIZER = "key.deserializer";
    private static final String KAFKA_VALUE_DESERIALIZER = "value.deserializer";
    private static final String KAFKA_SESSION_TIMEOUT = "session.timeout.ms";
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class);
    private static final Gson gson = new Gson();
    private static AtomicLong metricsIndex = new AtomicLong(0);

    public List<Reader> split(TaskProfile taskProfile) {
        ArrayList arrayList = new ArrayList();
        String str = taskProfile.get("job.pattern", "");
        Properties properties = new Properties();
        Map map = (Map) gson.fromJson(taskProfile.toJsonStr(), Map.class);
        properties.put("job.kafkaJob.bootstrap.servers".replace(JOB_KAFKAJOB_PARAM_PREFIX, ""), map.get("job.kafkaJob.bootstrap.servers"));
        properties.put(KAFKA_KEY_DESERIALIZER, KAFKA_DESERIALIZER_METHOD);
        properties.put(KAFKA_VALUE_DESERIALIZER, KAFKA_DESERIALIZER_METHOD);
        properties.put(KAFKA_COMMIT_AUTO, false);
        if (ObjectUtils.isNotEmpty(map.get("job.kafkaJob.autoOffsetReset"))) {
            properties.put(JOB_KAFKA_AUTO_RESETE, map.get("job.kafkaJob.autoOffsetReset"));
        }
        List<PartitionInfo> partitionsFor = new KafkaConsumer(properties).partitionsFor(taskProfile.get("job.kafkaJob.topic"));
        String str2 = (String) map.get("job.kafkaJob.partition.offset");
        Long l = null;
        String[] split = StringUtils.isNotBlank(str2) ? str2.split("_") : null;
        properties.put(KAFKA_SESSION_TIMEOUT, 30000);
        if (null != partitionsFor) {
            for (PartitionInfo partitionInfo : partitionsFor) {
                properties.put("job.kafkaJob.group.id".replace(JOB_KAFKAJOB_PARAM_PREFIX, ""), map.getOrDefault("job.kafkaJob.group.id", ((String) map.get("task.id")) + "_group" + partitionInfo.partition()));
                KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                kafkaConsumer.assign(Collections.singletonList(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
                if (split != null && split.length > 0) {
                    for (String str3 : split) {
                        if (str3.contains("#") && str3.split("#")[0].equals(String.valueOf(partitionInfo.partition()))) {
                            l = Long.valueOf(str3.split("#")[1]);
                        }
                    }
                }
                LOGGER.info("kafka topic partition offset:{}", l);
                if (l != null) {
                    kafkaConsumer.seek(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), l.longValue());
                }
                KafkaReader kafkaReader = new KafkaReader(kafkaConsumer, map);
                addValidator(str, kafkaReader);
                arrayList.add(kafkaReader);
            }
            this.sourceMetric.sourceSuccessCount.incrementAndGet();
        } else {
            this.sourceMetric.sourceFailCount.incrementAndGet();
        }
        return arrayList;
    }

    public Message read() {
        return null;
    }

    public boolean sourceFinish() {
        return false;
    }

    public boolean sourceExist() {
        return false;
    }

    private void addValidator(String str, KafkaReader kafkaReader) {
        kafkaReader.addPatternValidator(str);
    }
}
