package org.apache.inlong.agent.plugin.sources.reader;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Validator;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/reader/KafkaReader.class */
public class KafkaReader<K, V> extends org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader {
    public static final int NEVER_STOP_SIGN = -1;
    private static final String KAFKA_READER_TAG_NAME = "AgentKafkaMetric";
    private static final String KAFKA_SOURCE_READ_RECORD_SPEED = "job.kafkaJob.record.speed.limit";
    private static final String KAFKA_SOURCE_READ_BYTE_SPEED = "job.kafkaJob.byte.speed.limit";
    private static final String KAFKA_SOURCE_READ_MIN_INTERVAL = "kafka.min.interval.limit";
    private static final String JOB_KAFKAJOB_READ_TIMEOUT = "job.kafkaJob.read.timeout";
    KafkaConsumer<K, V> consumer;
    long recordSpeed;
    long byteSpeed;
    long flowControlInterval;
    private Iterator<ConsumerRecord<K, V>> iterator;
    private long timeout;
    private String inlongGroupId;
    private String inlongStreamId;
    private String snapshot;
    private String topic;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaReader.class);
    private static AtomicLong currentTotalReadRecords = new AtomicLong(0);
    private static AtomicLong lastTotalReadRecords = new AtomicLong(0);
    private static AtomicLong currentTotalReadBytes = new AtomicLong(0);
    private static AtomicLong lastTotalReadBytes = new AtomicLong(0);
    private List<Validator> validators = new ArrayList();
    private long waitTimeout = 1000;
    private long lastTime = 0;
    private boolean isFinished = false;
    private boolean destroyed = false;
    long lastTimestamp = System.currentTimeMillis();

    public KafkaReader(KafkaConsumer<K, V> kafkaConsumer, Map<String, String> map) {
        this.consumer = kafkaConsumer;
        this.recordSpeed = Long.parseLong(map.getOrDefault("job.kafkaJob.recordSpeed.limit", "10000"));
        this.byteSpeed = Long.parseLong(map.getOrDefault("job.kafkaJob.byteSpeed.limit", String.valueOf(1048576)));
        this.flowControlInterval = Long.parseLong(map.getOrDefault(KAFKA_SOURCE_READ_MIN_INTERVAL, "1000"));
        this.topic = map.get("job.kafkaJob.topic");
        LOGGER.info("KAFKA_SOURCE_READ_RECORD_SPEED = {}", Long.valueOf(this.recordSpeed));
        LOGGER.info("KAFKA_SOURCE_READ_BYTE_SPEED = {}", Long.valueOf(this.byteSpeed));
    }

    public Message read() {
        if (this.iterator == null || !this.iterator.hasNext()) {
            if (isSourceExist()) {
                this.consumer.commitAsync();
            }
            fetchData(5000L);
        } else {
            ConsumerRecord<K, V> next = this.iterator.next();
            byte[] bArr = (byte[]) next.value();
            if (validateMessage(bArr)) {
                AuditUtils.add(3, this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis(), 1, bArr.length);
                HashMap hashMap = new HashMap();
                hashMap.put("record.offset", String.valueOf(next.offset()));
                hashMap.put("record.key", String.valueOf(next.key()));
                LOGGER.debug("partition:" + next.partition() + ", value:" + new String(bArr) + ", offset:" + next.offset());
                this.readerMetric.pluginReadSuccessCount.incrementAndGet();
                this.readerMetric.pluginReadCount.incrementAndGet();
                this.snapshot = next.partition() + "#" + next.offset();
                DefaultMessage defaultMessage = new DefaultMessage(bArr, hashMap);
                recordReadLimit(1L, defaultMessage.getBody().length);
                return defaultMessage;
            }
        }
        AgentUtils.silenceSleepInMs(this.waitTimeout);
        return null;
    }

    public boolean isFinished() {
        return this.isFinished;
    }

    public String getReadSource() {
        Iterator it = this.consumer.assignment().iterator();
        if (!it.hasNext()) {
            return "";
        }
        TopicPartition topicPartition = (TopicPartition) it.next();
        return topicPartition.topic() + "_" + topicPartition.partition();
    }

    public void setReadTimeout(long j) {
        this.timeout = j;
    }

    public void setWaitMillisecond(long j) {
        this.waitTimeout = j;
    }

    @Override // org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader
    public void init(InstanceProfile instanceProfile) {
        super.init(instanceProfile);
        this.snapshot = instanceProfile.get("job.kafkaJob.partition.offset", (String) null);
        initReadTimeout(instanceProfile);
        fetchData(5000L);
    }

    public void destroy() {
        synchronized (this) {
            if (!this.destroyed) {
                this.consumer.close();
                this.destroyed = true;
            }
        }
    }

    private void initReadTimeout(InstanceProfile instanceProfile) {
        int i = instanceProfile.getInt(JOB_KAFKAJOB_READ_TIMEOUT, -1);
        if (i == -1) {
            this.timeout = -1L;
        } else {
            this.timeout = TimeUnit.MINUTES.toMillis(i);
        }
    }

    private boolean validateMessage(byte[] bArr) {
        if (this.validators.isEmpty()) {
            return true;
        }
        return this.validators.stream().allMatch(validator -> {
            return validator.validate(new String(bArr));
        });
    }

    public void addPatternValidator(String str) {
        if (str.isEmpty()) {
            return;
        }
        this.validators.add(new PatternValidator(str));
    }

    public String getSnapshot() {
        return this.snapshot;
    }

    public void finishRead() {
        this.isFinished = true;
    }

    public boolean isSourceExist() {
        return !CollectionUtils.isEmpty(this.consumer.partitionsFor(this.topic));
    }

    private boolean fetchData(long j) {
        this.iterator = this.consumer.poll(Duration.ofMillis(j)).iterator();
        return this.iterator != null;
    }

    private void recordReadLimit(long j, long j2) {
        boolean z = this.byteSpeed > 0;
        boolean z2 = this.recordSpeed > 0;
        if (z || z2) {
            currentTotalReadRecords.accumulateAndGet(j, (j3, j4) -> {
                return j3 + j4;
            });
            currentTotalReadBytes.accumulateAndGet(j2, (j5, j6) -> {
                return j5 + j6;
            });
            long currentTimeMillis = System.currentTimeMillis();
            long j7 = currentTimeMillis - this.lastTimestamp;
            if (j7 - this.flowControlInterval >= 0) {
                long j8 = 0;
                long j9 = 0;
                if (z) {
                    long j10 = ((currentTotalReadBytes.get() - lastTotalReadBytes.get()) * 1000) / j7;
                    LOGGER.info("current produce byte speed bytes/s:{}", Long.valueOf(j10));
                    if (j10 > this.byteSpeed) {
                        j8 = ((j10 * j7) / this.byteSpeed) - j7;
                    }
                }
                if (z2) {
                    long j11 = ((currentTotalReadRecords.get() - lastTotalReadRecords.get()) * 1000) / j7;
                    LOGGER.info("current read speed records/s:{}", Long.valueOf(j11));
                    if (j11 > this.recordSpeed) {
                        j9 = ((j11 * j7) / this.recordSpeed) - j7;
                    }
                }
                long j12 = j8 < j9 ? j9 : j8;
                if (j12 > 0) {
                    LOGGER.info("sleep seconds:{}", Long.valueOf(j12 / 1000));
                    try {
                        Thread.sleep(j12);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            this.lastTimestamp = currentTimeMillis;
            lastTotalReadRecords = currentTotalReadRecords;
        }
    }
}
