package com.xiaomi.mone.log.agent.channel;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xiaomi.data.push.common.SafeRun;
import com.xiaomi.mone.file.MLog;
import com.xiaomi.mone.file.ReadListener;
import com.xiaomi.mone.file.ReadResult;
import com.xiaomi.mone.file.common.FileInfo;
import com.xiaomi.mone.file.common.FileInfoCache;
import com.xiaomi.mone.file.listener.DefaultMonitorListener;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
import com.xiaomi.mone.log.agent.channel.file.MonitorFile;
import com.xiaomi.mone.log.agent.channel.memory.AgentMemoryService;
import com.xiaomi.mone.log.agent.channel.memory.ChannelMemory;
import com.xiaomi.mone.log.agent.common.ChannelUtil;
import com.xiaomi.mone.log.agent.common.ExecutorUtil;
import com.xiaomi.mone.log.agent.export.MsgExporter;
import com.xiaomi.mone.log.agent.filter.FilterChain;
import com.xiaomi.mone.log.agent.input.Input;
import com.xiaomi.mone.log.api.enums.LogTypeEnum;
import com.xiaomi.mone.log.api.model.meta.FilterConf;
import com.xiaomi.mone.log.api.model.msg.LineMessage;
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.common.PathUtils;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/agent/channel/WildcardChannelServiceImpl.class */
public class WildcardChannelServiceImpl extends AbstractChannelService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WildcardChannelServiceImpl.class);
    private AgentMemoryService memoryService;
    private MsgExporter msgExporter;
    private ChannelDefine channelDefine;
    private ChannelMemory channelMemory;
    private FilterChain chain;
    private String logPattern;
    private String linePrefix;
    private String memoryBasePath;
    private static final String POINTER_FILENAME_PREFIX = ".ozhera_pointer";
    private ScheduledFuture<?> scheduledFuture;
    private ScheduledFuture<?> lastFileLineScheduledFuture;
    private DefaultMonitorListener defaultMonitorListener;
    private List<LineMessage> lineMessageList = new ArrayList();
    private List<Future<?>> fileCollFutures = Lists.newArrayList();
    private long lastSendTime = System.currentTimeMillis();
    private long logCounts = 0;
    private ReentrantLock reentrantLock = new ReentrantLock();

    public WildcardChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService agentMemoryService, ChannelDefine channelDefine, FilterChain filterChain, String str) {
        this.memoryService = agentMemoryService;
        this.msgExporter = msgExporter;
        this.channelDefine = channelDefine;
        this.chain = filterChain;
        this.memoryBasePath = str;
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void start() {
        Long channelId = this.channelDefine.getChannelId();
        Input input = this.channelDefine.getInput();
        this.logPattern = input.getLogPattern();
        this.linePrefix = input.getLinePrefix();
        List<String> parseLevel5Directory = PathUtils.parseLevel5Directory(this.logPattern);
        log.info("channel start, logPattern:{}，fileList:{}, channelId:{}, instanceId:{}", this.logPattern, Constant.GSON.toJson(parseLevel5Directory), channelId, instanceId());
        this.channelMemory = this.memoryService.getMemory(channelId);
        if (null == this.channelMemory) {
            this.channelMemory = initChannelMemory(channelId, input, parseLevel5Directory, this.channelDefine);
        }
        this.memoryService.cleanChannelMemoryContent(channelId, parseLevel5Directory);
        startCollectFile(channelId, input, getTailPodIp(this.logPattern));
        startExportQueueDataThread();
        this.memoryService.refreshMemory(this.channelMemory);
        log.warn("channelId:{}, channelInstanceId:{} start success! channelDefine:{}", channelId, instanceId(), Constant.GSON.toJson(this.channelDefine));
    }

    private void startCollectFile(Long l, Input input, String str) {
        try {
            FileInfoCache.ins().load(buildRestartFilePath());
            HeraFileMonitor createFileMonitor = createFileMonitor(input.getPatternCode(), str);
            String buildFileExpression = buildFileExpression(input.getLogPattern());
            List<String> buildMonitorPaths = buildMonitorPaths(input.getLogPattern());
            wildcardGraceShutdown(buildMonitorPaths, buildFileExpression);
            saveCollProgress();
            log.info("fileExpression:{}", buildFileExpression);
            Pattern compile = Pattern.compile(buildFileExpression);
            for (String str2 : buildMonitorPaths) {
                this.fileCollFutures.add(ExecutorUtil.submit(() -> {
                    monitorFileChanges(createFileMonitor, str2, compile);
                }));
            }
        } catch (Exception e) {
            log.error("startCollectFile error, channelId: {}, input: {}, ip: {}", l, Constant.GSON.toJson(input), str, e);
        }
    }

    private void saveCollProgress() {
        ExecutorUtil.scheduleAtFixedRate(() -> {
            SafeRun.run(() -> {
                try {
                    Iterator<ReadListener> it = this.defaultMonitorListener.getReadListenerList().iterator();
                    while (it.hasNext()) {
                        it.next().saveProgress();
                    }
                    cleanUpInvalidFileInfos();
                    FileInfoCache.ins().shutdown();
                } catch (Exception e) {
                    log.error("saveCollProgress error", (Throwable) e);
                }
            });
        }, 60L, 30L, TimeUnit.SECONDS);
    }

    private void cleanUpInvalidFileInfos() {
        for (Map.Entry<String, FileInfo> entry : FileInfoCache.ins().caches().entrySet()) {
            FileInfo value = entry.getValue();
            File file = new File(value.getFileName());
            if (!StringUtils.isEmpty(value.getFileName()) && !file.exists()) {
                FileInfoCache.ins().remove(entry.getKey());
            }
        }
    }

    private String buildRestartFilePath() {
        return String.format("%s%s%s", this.memoryBasePath, AgentMemoryService.MEMORY_DIR, POINTER_FILENAME_PREFIX);
    }

    private String buildFileExpression(String str) {
        String[] split = str.split(",");
        if (split.length == 1) {
            return ChannelUtil.buildSingleTimeExpress(str);
        }
        List list = Arrays.stream(split).map(ChannelUtil::buildSingleTimeExpress).map(str2 -> {
            return StringUtils.substringAfterLast(str2, "/").contains("*") ? str2 : str2 + ".*";
        }).distinct().toList();
        return list.size() == 1 ? (String) list.get(0) : (String) list.stream().collect(Collectors.joining("|", PathUtils.MULTI_FILE_PREFIX, PathUtils.MULTI_FILE_SUFFIX));
    }

    private void monitorFileChanges(HeraFileMonitor heraFileMonitor, String str, Pattern pattern) {
        try {
            log.info("monitorFileChanges,directory:{}", str);
            heraFileMonitor.reg(str, str2 -> {
                boolean matches = pattern.matcher(str2).matches();
                log.debug("file: {}, matches: {}", str2, Boolean.valueOf(matches));
                return matches;
            });
        } catch (IOException | InterruptedException e) {
            log.error("Error while monitoring files, monitorPath: {}", str, e);
        }
    }

    private List<String> buildMonitorPaths(String str) {
        return (List) Arrays.stream(str.split(",")).map(str2 -> {
            String substringBeforeLast = StringUtils.substringBeforeLast(str2, "/");
            return substringBeforeLast.endsWith("/") ? substringBeforeLast : substringBeforeLast + "/";
        }).flatMap(str3 -> {
            return PathUtils.buildMultipleDirectories(str3).stream();
        }).distinct().collect(Collectors.toList());
    }

    private HeraFileMonitor createFileMonitor(String str, String str2) {
        MLog mLog = new MLog();
        if (StringUtils.isNotBlank(this.linePrefix)) {
            mLog.setCustomLinePattern(this.linePrefix);
        }
        HeraFileMonitor heraFileMonitor = new HeraFileMonitor();
        AtomicReference<ReadResult> atomicReference = new AtomicReference<>();
        this.defaultMonitorListener = new DefaultMonitorListener(heraFileMonitor, readEvent -> {
            atomicReference.set(readEvent.getReadResult());
            if (atomicReference.get() == null) {
                log.info("Empty data");
            } else {
                processLogLines(atomicReference, str, str2, mLog);
            }
        });
        heraFileMonitor.setListener(this.defaultMonitorListener);
        scheduleLastLineSender(mLog, atomicReference, str, str2);
        return heraFileMonitor;
    }

    private void processLogLines(AtomicReference<ReadResult> atomicReference, String str, String str2, MLog mLog) {
        long currentTimeMillis = System.currentTimeMillis();
        ReadResult readResult = atomicReference.get();
        LogTypeEnum logTypeEnum = getLogTypeEnum();
        readResult.getLines().forEach(str3 -> {
            if (LogTypeEnum.APP_LOG_MULTI == logTypeEnum || LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
                str3 = mLog.append2(str3);
            }
            if (str3 == null) {
                log.debug("Biz log channelId:{}, not a new line", this.channelDefine.getChannelId());
                return;
            }
            try {
                this.reentrantLock.lock();
                wrapDataToSend(str3, atomicReference, str, str2, currentTimeMillis);
                this.reentrantLock.unlock();
            } catch (Throwable th) {
                this.reentrantLock.unlock();
                throw th;
            }
        });
    }

    private void scheduleLastLineSender(MLog mLog, AtomicReference<ReadResult> atomicReference, String str, String str2) {
        this.lastFileLineScheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
            Long appendTime = mLog.getAppendTime();
            if (appendTime == null || Instant.now().toEpochMilli() - appendTime.longValue() <= AbstractComponentTracker.LINGERING_TIMEOUT || !this.reentrantLock.tryLock()) {
                return;
            }
            try {
                String takeRemainMsg2 = mLog.takeRemainMsg2();
                if (null != takeRemainMsg2) {
                    log.info("start send last line, fileName:{}, patternCode:{}, data:{}", ((ReadResult) atomicReference.get()).getFilePathName(), str, takeRemainMsg2);
                    wrapDataToSend(takeRemainMsg2, atomicReference, str, str2, Instant.now().toEpochMilli());
                }
            } finally {
                this.reentrantLock.unlock();
            }
        }, 30L, 30L, TimeUnit.SECONDS);
    }

    private void wrapDataToSend(String str, AtomicReference<ReadResult> atomicReference, String str2, String str3, long j) {
        String filePathName = atomicReference.get().getFilePathName();
        LineMessage createLineMessage = createLineMessage(str, atomicReference, filePathName, str2, str3, j);
        updateChannelMemory(this.channelMemory, filePathName, getLogTypeEnum(), j, atomicReference);
        this.lineMessageList.add(createLineMessage);
        int batchExportSize = this.msgExporter.batchExportSize();
        if (this.lineMessageList.size() > batchExportSize) {
            doExport(this.lineMessageList.subList(0, batchExportSize));
        }
    }

    private void doExport(List<LineMessage> list) {
        try {
            try {
            } catch (Exception e) {
                log.error("doExport Exception", (Throwable) e);
                list.clear();
            }
            if (CollectionUtils.isEmpty(list)) {
                list.clear();
                return;
            }
            this.chain.doFilter();
            long currentTimeMillis = System.currentTimeMillis();
            this.msgExporter.export(list);
            this.logCounts += list.size();
            this.lastSendTime = System.currentTimeMillis();
            this.channelMemory.setCurrentTime(Long.valueOf(this.lastSendTime));
            log.info("doExport channelId:{}, send {} message, cost:{}, total send:{}, instanceId:{},", this.channelDefine.getChannelId(), Integer.valueOf(list.size()), Long.valueOf(this.lastSendTime - currentTimeMillis), Long.valueOf(this.logCounts), instanceId());
            list.clear();
        } catch (Throwable th) {
            list.clear();
            throw th;
        }
    }

    private void startExportQueueDataThread() {
        this.scheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
            if (System.currentTimeMillis() - this.lastSendTime < AbstractComponentTracker.LINGERING_TIMEOUT || CollectionUtils.isEmpty(this.lineMessageList) || !CollectionUtils.isNotEmpty(this.lineMessageList) || !this.reentrantLock.tryLock()) {
                return;
            }
            try {
                doExport(this.lineMessageList);
            } finally {
                this.reentrantLock.unlock();
            }
        }, 10L, 7L, TimeUnit.SECONDS);
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public ChannelDefine getChannelDefine() {
        return this.channelDefine;
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public ChannelMemory getChannelMemory() {
        return this.channelMemory;
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public Map<String, Long> getExpireFileMap() {
        return Maps.newHashMap();
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public void cancelFile(String str) {
    }

    @Override // com.xiaomi.mone.log.agent.channel.AbstractChannelService
    public Long getLogCounts() {
        return Long.valueOf(this.logCounts);
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void refresh(ChannelDefine channelDefine, MsgExporter msgExporter) {
        this.channelDefine = channelDefine;
        if (null != msgExporter) {
            this.msgExporter.close();
            this.msgExporter = msgExporter;
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void stopFile(List<String> list) {
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void filterRefresh(List<FilterConf> list) {
        try {
            this.chain.loadFilterList(list);
            this.chain.reset();
        } catch (Exception e) {
            log.error("filter refresh err,new conf:{}", list, e);
        }
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void reOpen(String str) {
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public List<MonitorFile> getMonitorPathList() {
        return Lists.newArrayList();
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void cleanCollectFiles() {
    }

    @Override // com.xiaomi.mone.log.agent.channel.ChannelService
    public void deleteCollFile(String str) {
    }

    @Override // com.xiaomi.mone.log.agent.channel.Closeable
    public void close() {
        log.info("Delete the current collection task,channelId:{}", this.channelDefine.getChannelId());
        this.msgExporter.close();
        this.memoryService.refreshMemory(this.channelMemory);
        if (null != this.scheduledFuture) {
            this.scheduledFuture.cancel(false);
        }
        if (null != this.lastFileLineScheduledFuture) {
            this.lastFileLineScheduledFuture.cancel(false);
        }
        Iterator<Future<?>> it = this.fileCollFutures.iterator();
        while (it.hasNext()) {
            it.next().cancel(false);
        }
        this.lineMessageList.clear();
    }
}
