package com.xiaomi.mone.log.agent.channel;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Pair;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.xiaomi.data.push.common.SafeRun;
import com.xiaomi.mone.file.DefaultReadListener;
import com.xiaomi.mone.file.ILogFile;
import com.xiaomi.mone.file.LogFile;
import com.xiaomi.mone.file.MLog;
import com.xiaomi.mone.file.ReadListener;
import com.xiaomi.mone.file.ReadResult;
import com.xiaomi.mone.log.agent.channel.file.InodeFileComparator;
import com.xiaomi.mone.log.agent.channel.file.MonitorFile;
import com.xiaomi.mone.log.agent.channel.memory.AgentMemoryService;
import com.xiaomi.mone.log.agent.channel.memory.ChannelMemory;
import com.xiaomi.mone.log.agent.common.ChannelUtil;
import com.xiaomi.mone.log.agent.common.ExecutorUtil;
import com.xiaomi.mone.log.agent.export.MsgExporter;
import com.xiaomi.mone.log.agent.filter.FilterChain;
import com.xiaomi.mone.log.agent.input.Input;
import com.xiaomi.mone.log.api.enums.K8sPodTypeEnum;
import com.xiaomi.mone.log.api.enums.LogTypeEnum;
import com.xiaomi.mone.log.api.model.meta.FilterConf;
import com.xiaomi.mone.log.api.model.msg.LineMessage;
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.common.PathUtils;
import com.xiaomi.mone.log.utils.NetUtil;
import com.xiaomi.youpin.docean.Ioc;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/agent/channel/ChannelServiceImpl.class */
public class ChannelServiceImpl extends AbstractChannelService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ChannelServiceImpl.class);
    private AgentMemoryService memoryService;
    private MsgExporter msgExporter;
    private ChannelDefine channelDefine;
    private ChannelMemory channelMemory;
    private ScheduledFuture<?> lastFileLineScheduledFuture;
    private ScheduledFuture<?> scheduledFuture;
    private boolean collectOnce;
    private FilterChain chain;
    private LogTypeEnum logTypeEnum;
    private String logPattern;
    private String logSplitExpress;
    private String linePrefix;
    private final ConcurrentHashMap<String, ILogFile> logFileMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Future> futureMap = new ConcurrentHashMap<>();
    private Set<String> delFileCollList = new CopyOnWriteArraySet();
    private final Map<String, Long> reOpenMap = new HashMap();
    private final Map<String, Long> fileReadMap = new ConcurrentHashMap();
    private final Map<String, Pair<MLog, AtomicReference<ReadResult>>> resultMap = new ConcurrentHashMap();
    private Gson gson = Constant.GSON;
    private List<LineMessage> lineMessageList = new ArrayList();
    private ReentrantLock fileColLock = new ReentrantLock();
    private ReentrantLock fileReopenLock = new ReentrantLock();
    private long lastSendTime = System.currentTimeMillis();
    private long logCounts = 0;
    private List<MonitorFile> monitorFileList = Lists.newArrayList();

    public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService agentMemoryService, ChannelDefine channelDefine, FilterChain filterChain) {
        this.memoryService = agentMemoryService;
        this.msgExporter = msgExporter;
        this.channelDefine = channelDefine;
        this.chain = filterChain;
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void refresh(ChannelDefine channelDefine, MsgExporter msgExporter) {
        this.channelDefine = channelDefine;
        if (null != msgExporter) {
            this.msgExporter.close();
            this.msgExporter = msgExporter;
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void stopFile(List<String> list) {
        Map<String, ChannelMemory.FileProgress> fileProgressMap = this.channelMemory.getFileProgressMap();
        if (null == fileProgressMap) {
            fileProgressMap = new HashMap();
        }
        Iterator<Map.Entry<String, ILogFile>> it = this.logFileMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ILogFile> next = it.next();
            String key = next.getKey();
            Iterator<String> it2 = list.iterator();
            while (it2.hasNext()) {
                if (key.startsWith(it2.next())) {
                    next.getValue().setStop(true);
                    this.futureMap.get(key).cancel(false);
                    log.warn("channel:{} stop file:{} success", this.channelDefine.getChannelId(), key);
                    ChannelMemory.FileProgress fileProgress = fileProgressMap.get(key);
                    if (null != fileProgress) {
                        fileProgress.setFinished(true);
                    }
                    it.remove();
                }
            }
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void start() {
        Long channelId = this.channelDefine.getChannelId();
        Input input = this.channelDefine.getInput();
        this.logPattern = input.getLogPattern();
        this.logSplitExpress = input.getLogSplitExpress();
        this.linePrefix = input.getLinePrefix();
        this.logTypeEnum = LogTypeEnum.name2enum(this.channelDefine.getInput().getType());
        this.collectOnce = StringUtils.substringAfterLast(this.logPattern, "/").contains("*");
        List<String> parseLevel5Directory = PathUtils.parseLevel5Directory(this.logPattern);
        if (CollectionUtils.isEmpty(parseLevel5Directory)) {
            log.info("config pattern:{},current files not exist", this.logPattern);
        }
        log.info("channel start, logPattern:{}，fileList:{}, channelId:{}, instanceId:{}", this.logPattern, parseLevel5Directory, channelId, instanceId());
        logMonitorPathDisassembled(this.logSplitExpress, parseLevel5Directory, this.logPattern);
        this.channelMemory = this.memoryService.getMemory(channelId);
        if (null == this.channelMemory) {
            this.channelMemory = initChannelMemory(channelId, input, parseLevel5Directory, this.channelDefine);
        }
        this.memoryService.cleanChannelMemoryContent(channelId, parseLevel5Directory);
        startCollectFile(channelId, input, parseLevel5Directory);
        startExportQueueDataThread();
        this.memoryService.refreshMemory(this.channelMemory);
        log.warn("channelId:{}, channelInstanceId:{} start success! channelDefine:{}", channelId, instanceId(), this.gson.toJson(this.channelDefine));
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void cleanCollectFiles() {
        Iterator<String> it = this.delFileCollList.iterator();
        while (it.hasNext()) {
            delCollFile(it.next());
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void deleteCollFile(String str) {
        log.info("channelId:{},deleteCollFile,directory:{}", this.channelDefine.getChannelId(), str);
        for (Map.Entry<String, ILogFile> entry : this.logFileMap.entrySet()) {
            if (entry.getKey().contains(str)) {
                this.delFileCollList.add(entry.getKey());
                log.info("channelId:{},delFileCollList:{}", this.channelDefine.getChannelId(), this.gson.toJson(this.delFileCollList));
            }
        }
    }

    private void startExportQueueDataThread() {
        this.scheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
            if (System.currentTimeMillis() - this.lastSendTime < AbstractComponentTracker.LINGERING_TIMEOUT || CollectionUtils.isEmpty(this.lineMessageList) || !CollectionUtils.isNotEmpty(this.lineMessageList) || !this.fileColLock.tryLock()) {
                return;
            }
            try {
                doExport(this.lineMessageList);
            } finally {
                this.fileColLock.unlock();
            }
        }, 10L, 7L, TimeUnit.SECONDS);
    }

    private void startCollectFile(Long l, Input input, List<String> list) {
        for (int i = 0; i < list.size(); i++) {
            log.info("startCollectFile,total file:{},start:{},remain:{}", Integer.valueOf(list.size()), Integer.valueOf(i + 1), Integer.valueOf(list.size() - (i + 1)));
            readFile(input.getPatternCode(), getTailPodIp(list.get(i)), list.get(i), l);
            InodeFileComparator.addFile(list.get(i));
        }
        lastLineRemainSendSchedule(input.getPatternCode());
    }

    private void handleAllFileCollectMonitor(String str, String str2, Long l) {
        String tailPodIp = getTailPodIp(str2);
        if (!this.logFileMap.keySet().stream().anyMatch(str3 -> {
            return Objects.equals(str2, str3);
        })) {
            readFile(str, tailPodIp, str2, l);
        } else {
            log.info("collectOnce open file:{}", str2);
            this.logFileMap.get(str2).setReOpen(true);
        }
    }

    private void logMonitorPathDisassembled(String str, List<String> list, String str2) {
        String format;
        ArrayList newArrayList = Lists.newArrayList();
        if (StringUtils.isNotBlank(str)) {
            PathUtils.dismantlingStrWithSymbol(str, newArrayList);
        }
        if (LogTypeEnum.OPENTELEMETRY == this.logTypeEnum) {
            opentelemetryMonitor(str2);
            return;
        }
        if (this.collectOnce) {
            collectOnceFileMonitor(str2);
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            try {
                format = String.format("(%s|%s)", (String) newArrayList.get(i), String.format("%s.*", list.get(i)));
            } catch (Exception e) {
                format = String.format("%s.*", list.get(i));
            }
            this.monitorFileList.add(MonitorFile.of(list.get(i), format, this.logTypeEnum, this.collectOnce));
        }
    }

    private void collectOnceFileMonitor(String str) {
        this.monitorFileList.add(MonitorFile.of(str, ChannelUtil.buildSingleTimeExpress(str), this.logTypeEnum, this.collectOnce));
    }

    private void opentelemetryMonitor(String str) {
        this.monitorFileList.add(MonitorFile.of(str, ChannelUtil.buildLogExpressList(str).get(0), this.logTypeEnum, this.collectOnce));
    }

    private ReadListener initFileReadListener(MLog mLog, String str, String str2, String str3) {
        AtomicReference atomicReference = new AtomicReference();
        DefaultReadListener defaultReadListener = new DefaultReadListener(readEvent -> {
            atomicReference.set(readEvent.getReadResult());
            if (null == atomicReference.get()) {
                log.info("empty data");
            } else {
                long currentTimeMillis = System.currentTimeMillis();
                ((ReadResult) atomicReference.get()).getLines().stream().forEach(str4 -> {
                    LogTypeEnum name2enum = LogTypeEnum.name2enum(this.channelDefine.getInput().getType());
                    if (LogTypeEnum.APP_LOG_MULTI == name2enum || LogTypeEnum.OPENTELEMETRY == name2enum) {
                        str4 = mLog.append2(str4);
                    }
                    if (null == str4) {
                        log.debug("biz log channelId:{}, not new line:{}", this.channelDefine.getChannelId(), str4);
                        return;
                    }
                    try {
                        this.fileColLock.lock();
                        wrapDataToSend(str4, atomicReference, str3, str, str2, currentTimeMillis);
                        this.fileColLock.unlock();
                    } catch (Throwable th) {
                        this.fileColLock.unlock();
                        throw th;
                    }
                });
            }
        });
        this.resultMap.put(str3, Pair.of(mLog, atomicReference));
        return defaultReadListener;
    }

    private void lastLineRemainSendSchedule(String str) {
        this.lastFileLineScheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
            SafeRun.run(() -> {
                for (Map.Entry<String, Pair<MLog, AtomicReference<ReadResult>>> entry : this.resultMap.entrySet()) {
                    MLog key = entry.getValue().getKey();
                    String key2 = entry.getKey();
                    Long appendTime = key.getAppendTime();
                    if (null != appendTime && Instant.now().toEpochMilli() - appendTime.longValue() > AbstractComponentTracker.LINGERING_TIMEOUT && this.fileColLock.tryLock()) {
                        try {
                            String takeRemainMsg2 = key.takeRemainMsg2();
                            if (null != takeRemainMsg2) {
                                log.info("start send last line,pattern:{},patternCode:{},data:{}", key2, str, takeRemainMsg2);
                                wrapDataToSend(takeRemainMsg2, entry.getValue().getValue(), key2, str, getTailPodIp(key2), appendTime.longValue());
                            }
                        } finally {
                            this.fileColLock.unlock();
                        }
                    }
                }
            });
        }, 30L, 30L, TimeUnit.SECONDS);
    }

    private void wrapDataToSend(String str, AtomicReference<ReadResult> atomicReference, String str2, String str3, String str4, long j) {
        LineMessage createLineMessage = createLineMessage(str, atomicReference, str2, str3, str4, j);
        updateChannelMemory(this.channelMemory, str2, this.logTypeEnum, j, atomicReference);
        this.lineMessageList.add(createLineMessage);
        this.fileReadMap.put(str2, Long.valueOf(j));
        int batchExportSize = this.msgExporter.batchExportSize();
        if (this.lineMessageList.size() > batchExportSize) {
            doExport(this.lineMessageList.subList(0, batchExportSize));
        }
    }

    private void readFile(String str, String str2, String str3, Long l) {
        MLog mLog = new MLog();
        if (StringUtils.isNotBlank(this.linePrefix)) {
            mLog.setCustomLinePattern(this.linePrefix);
        }
        String localIp = StringUtils.isBlank(str2) ? NetUtil.getLocalIp() : str2;
        ReadListener initFileReadListener = initFileReadListener(mLog, str, localIp, str3);
        Map<String, ChannelMemory.FileProgress> fileProgressMap = this.channelMemory.getFileProgressMap();
        if (!this.collectOnce) {
            log.info("fileProgressMap:{}", this.gson.toJson(fileProgressMap));
        }
        ILogFile logFile = getLogFile(str3, initFileReadListener, fileProgressMap);
        if (null == logFile) {
            log.warn("file:{} marked stop to collect", str3);
            return;
        }
        if (!FileUtil.exist(str3)) {
            log.info("file not exist,file:{}", str3);
            return;
        }
        stopOldCurrentFileThread(str3);
        log.info("start to collect file,channelId:{},fileName:{}", l, str3);
        this.logFileMap.put(str3, logFile);
        this.futureMap.put(str3, ExecutorUtil.submit(() -> {
            try {
                log.info("thread {} {}", Boolean.valueOf(Thread.currentThread().isVirtual()), Thread.currentThread());
                logFile.readLine();
            } catch (Exception e) {
                log.error("logFile read line err,channelId:{},localIp:{},file:{},patternCode:{}", l, localIp, fileProgressMap, str, e);
            }
        }));
    }

    private void stopOldCurrentFileThread(String str) {
        ILogFile iLogFile = this.logFileMap.get(str);
        if (null != iLogFile) {
            iLogFile.setStop(true);
        }
        Future future = this.futureMap.get(str);
        if (null != future) {
            future.cancel(false);
        }
    }

    private ILogFile getLogFile(String str, ReadListener readListener, Map<String, ChannelMemory.FileProgress> map) {
        ChannelMemory.UnixFileNode unixFileNode;
        ChannelMemory.FileProgress fileProgress = map.get(str);
        if ((fileProgress == null || (fileProgress.getFinished() != null && fileProgress.getFinished().booleanValue())) && StringUtils.isNotBlank(this.channelDefine.getPodType()) && K8sPodTypeEnum.valueOf(this.channelDefine.getPodType().toUpperCase()) != K8sPodTypeEnum.STATEFUL) {
            return null;
        }
        long longValue = fileProgress != null ? fileProgress.getPointer().longValue() : 0L;
        long longValue2 = fileProgress != null ? fileProgress.getCurrentRowNum().longValue() : 0L;
        if (fileProgress != null && (unixFileNode = fileProgress.getUnixFileNode()) != null && unixFileNode.getSt_ino() != null) {
            log.info("memory file inode info, filePath:{},:{}", str, this.gson.toJson(unixFileNode));
            ChannelMemory.UnixFileNode buildUnixFileNode = ChannelUtil.buildUnixFileNode(str);
            if (buildUnixFileNode != null && buildUnixFileNode.getSt_ino() != null && !Objects.equals(unixFileNode.getSt_ino(), buildUnixFileNode.getSt_ino())) {
                longValue = 0;
                longValue2 = 0;
                log.info("read file start from head, filePath:{}, memory:{}, current:{}", str, this.gson.toJson(unixFileNode), this.gson.toJson(buildUnixFileNode));
            }
        }
        ILogFile logFile = ((ChannelEngine) Ioc.ins().getBean(ChannelEngine.class)).logFile();
        logFile.initLogFile(str, readListener, longValue, longValue2);
        return logFile;
    }

    private void doExport(List<LineMessage> list) {
        try {
            try {
            } catch (Exception e) {
                log.error("doExport Exception:{}", (Throwable) e);
                list.clear();
            }
            if (CollectionUtils.isEmpty(list)) {
                list.clear();
                return;
            }
            this.chain.doFilter();
            long currentTimeMillis = System.currentTimeMillis();
            this.msgExporter.export(list);
            this.logCounts += list.size();
            this.lastSendTime = System.currentTimeMillis();
            this.channelMemory.setCurrentTime(Long.valueOf(this.lastSendTime));
            log.info("doExport channelId:{}, send {} message, cost:{}, total send:{}, instanceId:{},", this.channelDefine.getChannelId(), Integer.valueOf(list.size()), Long.valueOf(this.lastSendTime - currentTimeMillis), Long.valueOf(this.logCounts), instanceId());
            list.clear();
        } catch (Throwable th) {
            list.clear();
            throw th;
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.Closeable
    public void close() {
        log.info("Delete the current collection task,channelId:{}", getChannelId());
        for (Map.Entry<String, ILogFile> entry : this.logFileMap.entrySet()) {
            entry.getValue().setStop(true);
            InodeFileComparator.removeFile(entry.getKey());
        }
        this.msgExporter.close();
        this.memoryService.refreshMemory(this.channelMemory);
        if (null != this.scheduledFuture) {
            this.scheduledFuture.cancel(false);
        }
        if (null != this.lastFileLineScheduledFuture) {
            this.lastFileLineScheduledFuture.cancel(false);
        }
        Iterator<Future> it = this.futureMap.values().iterator();
        while (it.hasNext()) {
            it.next().cancel(false);
        }
        log.info("stop file monitor,fileName:", this.logFileMap.keySet().stream().collect(Collectors.joining(",")));
        this.lineMessageList.clear();
        this.reOpenMap.clear();
        this.fileReadMap.clear();
        this.resultMap.clear();
    }

    public Long getChannelId() {
        return this.channelDefine.getChannelId();
    }

    public MsgExporter getMsgExporter() {
        return this.msgExporter;
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void filterRefresh(List<FilterConf> list) {
        try {
            this.chain.loadFilterList(list);
            this.chain.reset();
        } catch (Exception e) {
            log.error("filter refresh err,new conf:{}", list, e);
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void reOpen(String str) {
        this.fileReopenLock.lock();
        try {
            if (this.reOpenMap.containsKey(str) && Instant.now().toEpochMilli() - this.reOpenMap.get(str).longValue() < AbstractComponentTracker.LINGERING_TIMEOUT) {
                log.info("The file has been opened too frequently.Please try again in 10 seconds.fileName:{},last time opening time.:{}", str, this.reOpenMap.get(str));
                this.fileReopenLock.unlock();
                return;
            }
            this.reOpenMap.put(str, Long.valueOf(Instant.now().toEpochMilli()));
            log.info("reOpen file:{}", str);
            if (this.collectOnce) {
                handleAllFileCollectMonitor(this.channelDefine.getInput().getPatternCode(), str, getChannelId());
                this.fileReopenLock.unlock();
                return;
            }
            ILogFile iLogFile = this.logFileMap.get(str);
            String tailPodIp = getTailPodIp(str);
            String localIp = StringUtils.isBlank(tailPodIp) ? NetUtil.getLocalIp() : tailPodIp;
            if (null == iLogFile) {
                readFile(this.channelDefine.getInput().getPatternCode(), localIp, str, getChannelId());
                log.info("watch new file create for channelId:{},ip:{},path:{}", getChannelId(), str, localIp);
            } else {
                handleExistingLogFileWithRetry(iLogFile, str, localIp);
            }
        } finally {
            this.fileReopenLock.unlock();
        }
    }

    private void handleExistingLogFileWithRetry(ILogFile iLogFile, String str, String str2) {
        LogFile logFile = (LogFile) iLogFile;
        for (int i = 0; i < 60; i++) {
            if (logFile.getPointer() >= logFile.getMaxPointer()) {
                iLogFile.setReOpen(true);
                log.info("file reOpen: channelId:{},ip:{},path:{}", getChannelId(), str2, str, logFile.getFile(), Integer.valueOf(logFile.getBeforePointerHashCode()));
                return;
            } else {
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    log.error("handleExistingLogFileWithRetry Sleep error", (Throwable) e);
                }
            }
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public List<MonitorFile> getMonitorPathList() {
        return this.monitorFileList;
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public ChannelDefine getChannelDefine() {
        return this.channelDefine;
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public ChannelMemory getChannelMemory() {
        return this.channelMemory;
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public Map<String, Long> getExpireFileMap() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Long> entry : this.fileReadMap.entrySet()) {
            if (Instant.now().toEpochMilli() - entry.getValue().longValue() > TimeUnit.MINUTES.toMillis(10L)) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public void cancelFile(String str) {
        log.info("cancelFile,file:{}", str);
        for (Map.Entry<String, ILogFile> entry : this.logFileMap.entrySet()) {
            if (str.equals(entry.getKey())) {
                this.delFileCollList.add(entry.getKey());
            }
        }
    }

    private void delCollFile(String str) {
        boolean z = false;
        if (!this.logFileMap.containsKey(str) || !this.fileReadMap.containsKey(str)) {
            z = true;
        } else if (Instant.now().toEpochMilli() - this.fileReadMap.get(str).longValue() > TimeUnit.MINUTES.toMillis(1L)) {
            Objects.requireNonNull(str);
            cleanFile((v1) -> {
                return r1.equals(v1);
            });
            z = true;
            log.info("stop coll file:{}", str);
        }
        if (z) {
            log.info("channelId:{},delCollFile remove file:{}", this.channelDefine.getChannelId(), str);
            this.delFileCollList.removeIf(str2 -> {
                return StringUtils.equals(str2, str);
            });
        }
    }

    private void cleanFile(Predicate<String> predicate) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<String, ILogFile> entry : this.logFileMap.entrySet()) {
            if (predicate.test(entry.getKey())) {
                InodeFileComparator.removeFile(entry.getKey());
                entry.getValue().setStop(true);
                newArrayList.add(entry.getKey());
                log.info("cleanFile,stop file:{}", entry.getKey());
            }
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            this.logFileMap.remove((String) it.next());
        }
        newArrayList.clear();
        for (Map.Entry<String, Future> entry2 : this.futureMap.entrySet()) {
            if (predicate.test(entry2.getKey())) {
                entry2.getValue().cancel(false);
                newArrayList.add(entry2.getKey());
            }
        }
        Iterator it2 = newArrayList.iterator();
        while (it2.hasNext()) {
            this.futureMap.remove((String) it2.next());
        }
        newArrayList.clear();
        Iterator it3 = ((List) this.reOpenMap.keySet().stream().filter(str -> {
            return predicate.test(str);
        }).collect(Collectors.toList())).iterator();
        while (it3.hasNext()) {
            this.reOpenMap.remove((String) it3.next());
        }
        Iterator it4 = ((List) this.fileReadMap.keySet().stream().filter(str2 -> {
            return predicate.test(str2);
        }).collect(Collectors.toList())).iterator();
        while (it4.hasNext()) {
            this.fileReadMap.remove((String) it4.next());
        }
        Iterator it5 = ((List) this.resultMap.keySet().stream().filter(str3 -> {
            return predicate.test(str3);
        }).collect(Collectors.toList())).iterator();
        while (it5.hasNext()) {
            this.resultMap.remove((String) it5.next());
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public Long getLogCounts() {
        return Long.valueOf(this.logCounts);
    }

    public ConcurrentHashMap<String, ILogFile> getLogFileMap() {
        return this.logFileMap;
    }

    public ConcurrentHashMap<String, Future> getFutureMap() {
        return this.futureMap;
    }
}
