package org.apache.inlong.agent.plugin.sources;

import com.google.gson.Gson;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.RandomAccessFile;
import java.lang.reflect.Constructor;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.file.MemoryManager;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
import org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/LogFileSource.class */
public class LogFileSource extends org.apache.inlong.agent.plugin.sources.file.AbstractSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogFileSource.class);
    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("log-file-source"));
    public InstanceProfile profile;
    private String taskId;
    private String instanceId;
    private int maxPackSize;
    private String fileName;
    private File file;
    private byte[] bufferToReadFile;
    public Map<String, String> metadata;
    private BlockingQueue<SourceData> queue;
    private String inodeInfo;
    private ExtendedHandler extendedHandler;
    private final Integer BATCH_READ_LINE_COUNT = Integer.valueOf(LogFileCollectTask.CORE_THREAD_PRINT_TIME);
    private final Integer BATCH_READ_LINE_TOTAL_LEN = 1048576;
    private final Integer CORE_THREAD_PRINT_INTERVAL_MS = Integer.valueOf(LogFileCollectTask.CORE_THREAD_SLEEP_TIME);
    private final Integer CACHE_QUEUE_SIZE = Integer.valueOf(10 * this.BATCH_READ_LINE_COUNT.intValue());
    private final Integer SIZE_OF_BUFFER_TO_READ_FILE = 65536;
    private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 30;
    private final Long INODE_UPDATE_INTERVAL_MS = 1000L;
    private final Integer READ_WAIT_TIMEOUT_MS = 10;
    private final SimpleDateFormat RECORD_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    public volatile long linePosition = 0;
    public volatile long bytePosition = 0;
    private boolean needMetadata = false;
    private boolean isIncrement = false;
    private final Gson GSON = new Gson();
    private volatile boolean runnable = true;
    private volatile boolean fileExist = true;
    private volatile long lastInodeUpdateTime = 0;
    private volatile boolean running = false;
    private long dataTime = 0;
    private volatile long emptyCount = 0;
    private boolean isRealTime = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/agent/plugin/sources/LogFileSource$SourceData.class */
    public class SourceData {
        private String data;
        private Long offset;

        public String getData() {
            return this.data;
        }

        public Long getOffset() {
            return this.offset;
        }

        public void setData(String str) {
            this.data = str;
        }

        public void setOffset(Long l) {
            this.offset = l;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SourceData)) {
                return false;
            }
            SourceData sourceData = (SourceData) obj;
            if (!sourceData.canEqual(this)) {
                return false;
            }
            Long offset = getOffset();
            Long offset2 = sourceData.getOffset();
            if (offset == null) {
                if (offset2 != null) {
                    return false;
                }
            } else if (!offset.equals(offset2)) {
                return false;
            }
            String data = getData();
            String data2 = sourceData.getData();
            return data == null ? data2 == null : data.equals(data2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof SourceData;
        }

        public int hashCode() {
            Long offset = getOffset();
            int hashCode = (1 * 59) + (offset == null ? 43 : offset.hashCode());
            String data = getData();
            return (hashCode * 59) + (data == null ? 43 : data.hashCode());
        }

        public String toString() {
            return "LogFileSource.SourceData(data=" + getData() + ", offset=" + getOffset() + ")";
        }

        public SourceData(String str, Long l) {
            this.data = str;
            this.offset = l;
        }

        public SourceData() {
        }
    }

    @Override // org.apache.inlong.agent.plugin.sources.file.AbstractSource
    public void init(InstanceProfile instanceProfile) {
        try {
            LOGGER.info("LogFileSource init: {}", instanceProfile.toJsonStr());
            this.profile = instanceProfile;
            super.init(instanceProfile);
            String str = instanceProfile.get("task.fileTask.cycleUnit");
            if (str.compareToIgnoreCase("R") == 0) {
                this.isRealTime = true;
                str = "h";
            }
            this.taskId = instanceProfile.getTaskId();
            this.instanceId = instanceProfile.getInstanceId();
            this.fileName = instanceProfile.getInstanceId();
            this.maxPackSize = instanceProfile.getInt("proxy.package.maxSize", 512000);
            this.bufferToReadFile = new byte[this.SIZE_OF_BUFFER_TO_READ_FILE.intValue()];
            this.isIncrement = isIncrement(instanceProfile);
            this.file = new File(this.fileName);
            this.inodeInfo = instanceProfile.get("inodeInfo");
            this.lastInodeUpdateTime = AgentUtils.getCurrentTime();
            this.linePosition = getInitLineOffset(this.isIncrement, this.taskId, this.instanceId, this.inodeInfo);
            this.bytePosition = getBytePositionByLine(this.linePosition);
            this.queue = new LinkedBlockingQueue(this.CACHE_QUEUE_SIZE.intValue());
            this.dataTime = DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(), str);
            if ("org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler".compareTo(ExtendedHandler.class.getCanonicalName()) != 0) {
                Constructor<?> declaredConstructor = Class.forName(instanceProfile.get("task.fileTask.extendedClass", "org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler")).getDeclaredConstructor(InstanceProfile.class);
                declaredConstructor.setAccessible(true);
                this.extendedHandler = (ExtendedHandler) declaredConstructor.newInstance(instanceProfile);
            }
            try {
                registerMeta(instanceProfile);
            } catch (Exception e) {
                LOGGER.error("init metadata error", e);
            }
            EXECUTOR_SERVICE.execute(coreThread());
        } catch (Exception e2) {
            stopRunning();
            throw new FileException("error init stream for " + this.file.getPath(), e2);
        }
    }

    private int getRealLineCount(String str) {
        try {
            LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(this.instanceId));
            Throwable th = null;
            try {
                try {
                    lineNumberReader.skip(Long.MAX_VALUE);
                    int lineNumber = lineNumberReader.getLineNumber();
                    if (lineNumberReader != null) {
                        if (0 != 0) {
                            try {
                                lineNumberReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            lineNumberReader.close();
                        }
                    }
                    return lineNumber;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOGGER.error("getRealLineCount error {} file {}", e.getMessage(), str);
            return 0;
        }
    }

    private long getInitLineOffset(boolean z, String str, String str2, String str3) {
        long j;
        OffsetProfile offset = OffsetManager.getInstance().getOffset(str, str2);
        int realLineCount = getRealLineCount(str2);
        if (offset != null && offset.getInodeInfo().compareTo(str3) == 0) {
            j = offset.getOffset().longValue();
            if (realLineCount < j) {
                LOGGER.info("getInitLineOffset inode no change taskId {} file rotate, offset set to 0, file {}", str, this.fileName);
                j = 0;
            } else {
                LOGGER.info("getInitLineOffset inode no change taskId {} from db {}, file {}", new Object[]{str, Long.valueOf(j), this.fileName});
            }
        } else if (z) {
            j = getRealLineCount(str2);
            LOGGER.info("getInitLineOffset taskId {} for new increment read from {} file {}", new Object[]{str, Long.valueOf(j), this.fileName});
        } else {
            j = 0;
            LOGGER.info("getInitLineOffset taskId {} for new all read from 0 file {}", str, this.fileName);
        }
        return j;
    }

    public File getFile() {
        return this.file;
    }

    public void registerMeta(InstanceProfile instanceProfile) {
        if (instanceProfile.hasKey("job.fileTask.envList")) {
            Arrays.stream(instanceProfile.get("job.fileTask.envList").split(",")).forEach(str -> {
                if (str.equalsIgnoreCase("kubernetes")) {
                    this.needMetadata = true;
                    new KubernetesMetadataProvider(this).getData();
                } else if (str.equalsIgnoreCase("cvm")) {
                    this.needMetadata = true;
                    this.metadata.put("__HostName__", AgentUtils.getLocalHost());
                    this.metadata.put("__SourceIP__", AgentUtils.fetchLocalIp());
                    this.metadata.put("__FileName__", this.file.getName());
                }
            });
        }
    }

    private boolean isIncrement(InstanceProfile instanceProfile) {
        return instanceProfile.hasKey("job.fileTask.contentCollectType") && "INCREMENT".equalsIgnoreCase(instanceProfile.get("job.fileTask.contentCollectType"));
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x005f, code lost:
    
        org.apache.inlong.agent.plugin.sources.LogFileSource.LOGGER.error("getBytePositionByLine LineNum {} larger than the real file");
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private long getBytePositionByLine(long r11) throws java.io.IOException {
        /*
            r10 = this;
            r0 = 0
            r13 = r0
            r0 = 0
            r15 = r0
            r0 = 0
            r17 = r0
            java.io.RandomAccessFile r0 = new java.io.RandomAccessFile     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            r1 = r0
            r2 = r10
            java.io.File r2 = r2.file     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            java.lang.String r3 = "r"
            r1.<init>(r2, r3)     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            r17 = r0
        L17:
            r0 = r15
            r1 = r11
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 >= 0) goto L6f
            java.util.ArrayList r0 = new java.util.ArrayList     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            r1 = r0
            r1.<init>()     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            r18 = r0
            r0 = r10
            r1 = r17
            r2 = r13
            r3 = r18
            r4 = r11
            r5 = r15
            long r4 = r4 - r5
            int r4 = (int) r4     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            r5 = r10
            java.lang.Integer r5 = r5.BATCH_READ_LINE_COUNT     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            int r5 = r5.intValue()     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            int r4 = java.lang.Math.min(r4, r5)     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            r5 = r10
            java.lang.Integer r5 = r5.BATCH_READ_LINE_TOTAL_LEN     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            int r5 = r5.intValue()     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            r6 = 1
            long r0 = r0.readLines(r1, r2, r3, r4, r5, r6)     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            r13 = r0
            r0 = r15
            r1 = r18
            int r1 = r1.size()     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            long r1 = (long) r1     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            long r0 = r0 + r1
            r15 = r0
            r0 = r18
            int r0 = r0.size()     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            if (r0 != 0) goto L6c
            org.slf4j.Logger r0 = org.apache.inlong.agent.plugin.sources.LogFileSource.LOGGER     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            java.lang.String r1 = "getBytePositionByLine LineNum {} larger than the real file"
            r0.error(r1)     // Catch: java.lang.Exception -> L7c java.lang.Throwable -> L97
            goto L6f
        L6c:
            goto L17
        L6f:
            r0 = r17
            if (r0 == 0) goto La6
            r0 = r17
            r0.close()
            goto La6
        L7c:
            r18 = move-exception
            org.slf4j.Logger r0 = org.apache.inlong.agent.plugin.sources.LogFileSource.LOGGER     // Catch: java.lang.Throwable -> L97
            java.lang.String r1 = "getBytePositionByLine error: "
            r2 = r18
            r0.error(r1, r2)     // Catch: java.lang.Throwable -> L97
            r0 = r17
            if (r0 == 0) goto La6
            r0 = r17
            r0.close()
            goto La6
        L97:
            r19 = move-exception
            r0 = r17
            if (r0 == 0) goto La3
            r0 = r17
            r0.close()
        La3:
            r0 = r19
            throw r0
        La6:
            org.slf4j.Logger r0 = org.apache.inlong.agent.plugin.sources.LogFileSource.LOGGER
            java.lang.String r1 = "getBytePositionByLine {} LineNum {} position {}"
            r2 = 3
            java.lang.Object[] r2 = new java.lang.Object[r2]
            r3 = r2
            r4 = 0
            r5 = r10
            java.lang.String r5 = r5.fileName
            r3[r4] = r5
            r3 = r2
            r4 = 1
            r5 = r11
            java.lang.Long r5 = java.lang.Long.valueOf(r5)
            r3[r4] = r5
            r3 = r2
            r4 = 2
            r5 = r13
            java.lang.Long r5 = java.lang.Long.valueOf(r5)
            r3[r4] = r5
            r0.info(r1, r2)
            r0 = r13
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.agent.plugin.sources.LogFileSource.getBytePositionByLine(long):long");
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:13:0x0051. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:38:0x014b A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:42:0x001e A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private long readLines(java.io.RandomAccessFile r10, long r11, java.util.List<java.lang.String> r13, int r14, int r15, boolean r16) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 353
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.agent.plugin.sources.LogFileSource.readLines(java.io.RandomAccessFile, long, java.util.List, int, int, boolean):long");
    }

    public Message read() {
        SourceData sourceData = null;
        try {
            sourceData = this.queue.poll(this.READ_WAIT_TIMEOUT_MS.intValue(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOGGER.warn("poll {} data get interrupted.", this.file.getPath(), e);
        }
        if (sourceData == null) {
            return null;
        }
        MemoryManager.getInstance().release("agent.global.reader.queue.permit", sourceData.data.length());
        return createMessage(sourceData);
    }

    private Message createMessage(SourceData sourceData) {
        String fillMetaData = fillMetaData(sourceData.data);
        String str = this.profile.get("proxy.partitionKey", DigestUtils.md5Hex(this.inlongGroupId));
        HashMap hashMap = new HashMap();
        hashMap.put("dataKey", str);
        hashMap.put("offset", sourceData.offset.toString());
        hashMap.put("inlongStreamId", this.inlongStreamId);
        if (this.extendedHandler != null) {
            this.extendedHandler.dealWithHeader(hashMap, sourceData.getData().getBytes(StandardCharsets.UTF_8));
        }
        AuditUtils.add(3, this.inlongGroupId, (String) hashMap.get("inlongStreamId"), this.isRealTime ? AgentUtils.getCurrentTime() : this.profile.getSinkDataTime().longValue(), 1, fillMetaData.length());
        AuditUtils.add(30001, this.inlongGroupId, (String) hashMap.get("inlongStreamId"), AgentUtils.getCurrentTime(), 1, fillMetaData.length());
        DefaultMessage defaultMessage = new DefaultMessage(fillMetaData.getBytes(StandardCharsets.UTF_8), hashMap);
        if (defaultMessage.getBody().length <= this.maxPackSize) {
            return defaultMessage;
        }
        LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", Integer.valueOf(defaultMessage.getBody().length), Integer.valueOf(this.maxPackSize));
        return null;
    }

    public String fillMetaData(String str) {
        if (!this.needMetadata) {
            return str;
        }
        long currentTimeMillis = System.currentTimeMillis();
        boolean isJSON = FileDataUtils.isJSON(str);
        HashMap hashMap = new HashMap(this.metadata);
        hashMap.put("__content__", FileDataUtils.getK8sJsonLog(str, Boolean.valueOf(isJSON)));
        hashMap.put("__LogTime__", this.RECORD_TIME_FORMAT.format(new Date(currentTimeMillis)));
        return this.GSON.toJson(hashMap);
    }

    private boolean waitForPermit(String str, int i) {
        boolean z = false;
        while (!z) {
            z = MemoryManager.getInstance().tryAcquire(str, i);
            if (!z) {
                MemoryManager.getInstance().printDetail(str, "log file source");
                if (isInodeChanged() || !isRunnable()) {
                    return false;
                }
                AgentUtils.silenceSleepInSeconds(1L);
            }
        }
        return true;
    }

    private boolean isInodeChanged() {
        if (AgentUtils.getCurrentTime() - this.lastInodeUpdateTime <= this.INODE_UPDATE_INTERVAL_MS.longValue()) {
            return false;
        }
        try {
            return FileDataUtils.getInodeInfo(this.fileName).compareTo(this.inodeInfo) != 0;
        } catch (IOException e) {
            LOGGER.error("check inode change file {} error {}", this.fileName, e.getMessage());
            return true;
        }
    }

    public Runnable coreThread() {
        return () -> {
            AgentThreadFactory.nameThread("log-file-source-" + this.taskId + "-" + this.file);
            this.running = true;
            long j = 0;
            while (true) {
                if (!isRunnable() || !this.fileExist) {
                    break;
                }
                if (isInodeChanged()) {
                    this.fileExist = false;
                    LOGGER.info("inode changed, instance will restart and offset will be clean, file {}", this.fileName);
                    break;
                }
                if (this.file.length() < this.bytePosition) {
                    this.fileExist = false;
                    LOGGER.info("file rotate, instance will restart and offset will be clean, file {}", this.fileName);
                    break;
                }
                if (!waitForPermit("agent.global.reader.source.permit", this.BATCH_READ_LINE_TOTAL_LEN.intValue())) {
                    break;
                }
                List<SourceData> list = null;
                try {
                    list = readFromPos(this.bytePosition);
                } catch (FileNotFoundException e) {
                    this.fileExist = false;
                    LOGGER.error("readFromPos file deleted {}", e.getMessage());
                } catch (IOException e2) {
                    LOGGER.error("readFromPos error {}", e2.getMessage());
                }
                MemoryManager.getInstance().release("agent.global.reader.source.permit", this.BATCH_READ_LINE_TOTAL_LEN.intValue());
                if (list.isEmpty()) {
                    if (this.queue.isEmpty()) {
                        this.emptyCount++;
                    } else {
                        this.emptyCount = 0L;
                    }
                    AgentUtils.silenceSleepInSeconds(1L);
                } else {
                    this.emptyCount = 0L;
                    for (int i = 0; i < list.size() && waitForPermit("agent.global.reader.queue.permit", list.get(i).data.length()); i++) {
                        putIntoQueue(list.get(i));
                    }
                    if (AgentUtils.getCurrentTime() - j > this.CORE_THREAD_PRINT_INTERVAL_MS.intValue()) {
                        j = AgentUtils.getCurrentTime();
                        LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", new Object[]{this.file.getName(), Long.valueOf(this.linePosition), Long.valueOf(this.bytePosition), Long.valueOf(this.file.length()), Integer.valueOf(list.size())});
                    }
                }
            }
            this.running = false;
        };
    }

    private void putIntoQueue(SourceData sourceData) {
        if (sourceData == null) {
            return;
        }
        boolean z = false;
        while (isRunnable() && !z) {
            try {
                z = this.queue.offer(sourceData, 1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                MemoryManager.getInstance().release("agent.global.reader.queue.permit", sourceData.data.length());
                LOGGER.error("fetchData offer failed {}", e.getMessage());
                return;
            }
        }
        if (!z) {
            MemoryManager.getInstance().release("agent.global.reader.queue.permit", sourceData.data.length());
        }
        LOGGER.debug("Read {} from file {}", sourceData.getData(), this.fileName);
    }

    public boolean isRunnable() {
        return this.runnable;
    }

    public void stopRunning() {
        this.runnable = false;
    }

    private List<SourceData> readFromPos(long j) throws IOException {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        RandomAccessFile randomAccessFile = new RandomAccessFile(this.file, "r");
        this.bytePosition = readLines(randomAccessFile, j, arrayList, this.BATCH_READ_LINE_COUNT.intValue(), this.BATCH_READ_LINE_TOTAL_LEN.intValue(), false);
        for (int i = 0; i < arrayList.size(); i++) {
            this.linePosition++;
            arrayList2.add(new SourceData(arrayList.get(i), Long.valueOf(this.linePosition)));
        }
        if (randomAccessFile != null) {
            randomAccessFile.close();
        }
        return arrayList2;
    }

    @Override // org.apache.inlong.agent.plugin.sources.file.AbstractSource
    public void destroy() {
        LOGGER.info("destroy read source name {}", this.fileName);
        stopRunning();
        while (this.running) {
            AgentUtils.silenceSleepInMs(1L);
        }
        clearQueue(this.queue);
        LOGGER.info("destroy read source name {} end", this.fileName);
    }

    private void clearQueue(BlockingQueue<SourceData> blockingQueue) {
        if (blockingQueue == null) {
            return;
        }
        while (blockingQueue != null && !blockingQueue.isEmpty()) {
            SourceData sourceData = null;
            try {
                sourceData = blockingQueue.poll(this.READ_WAIT_TIMEOUT_MS.intValue(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOGGER.warn("poll {} data get interrupted.", this.file.getPath(), e);
            }
            if (sourceData != null) {
                MemoryManager.getInstance().release("agent.global.reader.queue.permit", sourceData.data.length());
            }
        }
        blockingQueue.clear();
    }

    public boolean sourceFinish() {
        return !this.isRealTime && this.emptyCount > ((long) this.EMPTY_CHECK_COUNT_AT_LEAST.intValue());
    }

    public boolean sourceExist() {
        return this.fileExist;
    }

    public List<Reader> split(TaskProfile taskProfile) {
        return null;
    }
}
