package org.apache.inlong.agent.plugin.sources.reader;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.Validator;
import org.apache.inlong.agent.plugin.except.FileException;
import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
import org.apache.inlong.agent.plugin.metrics.PluginMetric;
import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/reader/TextFileReader.class */
public class TextFileReader implements Reader {
    private static final String TEXT_FILE_READER_TAG_NAME = "AgentTextMetric";
    public static final int NEVER_STOP_SIGN = -1;
    private String inlongGroupId;
    private String inlongStreamId;
    private final File file;
    private final int position;
    private final String md5;
    private Iterator<String> iterator;
    private Stream<String> stream;
    private long timeout;
    private long waitTimeout;
    private long lastTime;
    private final PluginMetric textFileMetric;
    private List<Validator> validators;
    private static final Logger LOGGER = LoggerFactory.getLogger(TextFileReader.class);
    private static AtomicLong metricsIndex = new AtomicLong(0);

    public TextFileReader(File file, int i) {
        this(file, i, "");
    }

    public TextFileReader(File file, int i, String str) {
        this.lastTime = 0L;
        this.validators = new ArrayList();
        this.file = file;
        this.position = i;
        this.md5 = str;
        if (ConfigUtil.isPrometheusEnabled()) {
            this.textFileMetric = new PluginPrometheusMetric(AgentUtils.getUniqId(TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
        } else {
            this.textFileMetric = new PluginJmxMetric(AgentUtils.getUniqId(TEXT_FILE_READER_TAG_NAME, metricsIndex.incrementAndGet()));
        }
    }

    public TextFileReader(File file) {
        this(file, 0);
    }

    public Message read() {
        if (this.iterator != null && this.iterator.hasNext()) {
            String next = this.iterator.next();
            if (validateMessage(next)) {
                AuditUtils.add(3, this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis());
                this.textFileMetric.incReadNum();
                return new DefaultMessage(next.getBytes(StandardCharsets.UTF_8));
            }
        }
        AgentUtils.silenceSleepInMs(this.waitTimeout);
        return null;
    }

    private boolean validateMessage(String str) {
        if (this.validators.isEmpty()) {
            return true;
        }
        return this.validators.stream().allMatch(validator -> {
            return validator.validate(str);
        });
    }

    public boolean isFinished() {
        if (this.timeout == -1) {
            return false;
        }
        if (this.iterator == null) {
            return true;
        }
        if (this.iterator.hasNext()) {
            this.lastTime = 0L;
            return false;
        }
        if (this.lastTime == 0) {
            this.lastTime = System.currentTimeMillis();
        }
        return System.currentTimeMillis() - this.lastTime > this.timeout;
    }

    public String getReadSource() {
        return this.file.getAbsolutePath();
    }

    public void setReadTimeout(long j) {
        this.timeout = j;
    }

    public void setWaitMillisecs(long j) {
        this.waitTimeout = j;
    }

    public void addPatternValidator(String str) {
        if (str.isEmpty()) {
            return;
        }
        this.validators.add(new PatternValidator(str));
    }

    public void init(JobProfile jobProfile) {
        try {
            initReadTimeout(jobProfile);
            String fileMd5 = AgentUtils.getFileMd5(this.file);
            if (StringUtils.isNotBlank(this.md5) && !this.md5.equals(fileMd5)) {
                LOGGER.warn("md5 is differ from origin, origin: {}, new {}", this.md5, fileMd5);
            }
            LOGGER.info("file name for task is {}, md5 is {}", this.file, fileMd5);
            this.stream = Files.newBufferedReader(this.file.toPath()).lines().skip(this.position);
            this.iterator = this.stream.iterator();
            this.inlongGroupId = jobProfile.get("proxy.inlongGroupId", "default_inlong_group_id");
            this.inlongStreamId = jobProfile.get("proxy.inlongStreamId", "default_inlong_stream_id");
        } catch (Exception e) {
            throw new FileException("error init stream for " + this.file.getPath(), e);
        }
    }

    private void initReadTimeout(JobProfile jobProfile) {
        int i = jobProfile.getInt("job.file.max.wait", 1);
        if (i == -1) {
            this.timeout = -1L;
        } else {
            this.timeout = TimeUnit.MINUTES.toMillis(i);
        }
    }

    public void destroy() {
        AgentUtils.finallyClose(this.stream);
        LOGGER.info("destroy reader with read {} num {}", this.textFileMetric.getTagName(), Long.valueOf(this.textFileMetric.getReadNum()));
    }
}
