package org.apache.inlong.agent.plugin.trigger;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.plugin.Trigger;
import org.apache.inlong.agent.plugin.utils.PluginUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.class */
public class DirectoryTrigger extends AbstractDaemon implements Trigger {
    private static final Logger LOGGER = LoggerFactory.getLogger(DirectoryTrigger.class);
    private static volatile WatchService watchService;
    private final ConcurrentHashMap<PathPattern, List<WatchKey>> allWatchers = new ConcurrentHashMap<>();
    private final LinkedBlockingQueue<JobProfile> queue = new LinkedBlockingQueue<>();
    private TriggerProfile profile;
    private int interval;

    private static void initWatchService() {
        try {
            if (watchService == null) {
                synchronized (DirectoryTrigger.class) {
                    if (watchService == null) {
                        watchService = FileSystems.getDefault().newWatchService();
                        LOGGER.info("init watch service {}", watchService);
                    }
                }
            }
        } catch (Exception e) {
            LOGGER.warn("error while init watch service", e);
        }
    }

    public TriggerProfile getProfile() {
        return this.profile;
    }

    public void destroy() {
        try {
            stop();
        } catch (Exception e) {
            LOGGER.error("exception while stopping threads", e);
        }
    }

    public JobProfile fetchJobProfile() {
        return this.queue.poll();
    }

    public TriggerProfile getTriggerProfile() {
        return this.profile;
    }

    public void stop() {
        waitForTerminate();
        releaseResource();
    }

    private void registerAllSubDir(PathPattern pathPattern, Path path, List<WatchKey> list) throws Exception {
        LOGGER.info("check whether path {} is suitable", path);
        if (pathPattern.suitForWatch(path.toString())) {
            if (!path.toFile().isDirectory()) {
                JobProfile copyJobProfile = PluginUtils.copyJobProfile(this.profile, pathPattern.getSuitTime(), path.toFile());
                LOGGER.info("trigger {} generate job profile to read file {}", getTriggerProfile().getTriggerId(), path.toString());
                this.queue.offer(copyJobProfile);
                return;
            }
            list.add(path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE));
            Stream<Path> list2 = Files.list(path);
            Throwable th = null;
            try {
                try {
                    Iterator<Path> it = list2.iterator();
                    while (it.hasNext()) {
                        registerAllSubDir(pathPattern, it.next().toAbsolutePath(), list);
                    }
                    if (list2 != null) {
                        if (0 == 0) {
                            list2.close();
                            return;
                        }
                        try {
                            list2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (list2 != null) {
                    if (th != null) {
                        try {
                            list2.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        list2.close();
                    }
                }
                throw th4;
            }
        }
    }

    private void registerNewDir(PathPattern pathPattern, WatchKey watchKey, List<WatchKey> list, List<WatchKey> list2) throws Exception {
        Path path = (Path) watchKey.watchable();
        for (WatchEvent<?> watchEvent : watchKey.pollEvents()) {
            if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
                pathPattern.updateDateFormatRegex();
                Path path2 = (Path) watchEvent.context();
                if (path2 != null) {
                    registerAllSubDir(pathPattern, path.resolve(path2), list);
                }
            } else if (watchEvent.kind() == StandardWatchEventKinds.OVERFLOW) {
                LOGGER.info("overflow got {}", path);
                if (Files.isDirectory(path, new LinkOption[0])) {
                    Iterator<Path> it = Files.list(path).iterator();
                    while (it.hasNext()) {
                        registerAllSubDir(pathPattern, path.resolve(it.next()), list);
                    }
                }
            }
        }
        if (Files.exists(path, new LinkOption[0])) {
            return;
        }
        LOGGER.warn("{} not exist, add watcher to pending delete list", path);
        list2.add(watchKey);
    }

    private Runnable watchEventHandler() {
        return () -> {
            while (isRunnable()) {
                try {
                    TimeUnit.SECONDS.sleep(this.interval);
                    this.allWatchers.forEach((pathPattern, list) -> {
                        ArrayList arrayList = new ArrayList();
                        ArrayList arrayList2 = new ArrayList();
                        pathPattern.cleanup();
                        try {
                            Iterator it = list.iterator();
                            while (it.hasNext()) {
                                registerNewDir(pathPattern, (WatchKey) it.next(), arrayList, arrayList2);
                            }
                        } catch (Exception e) {
                            LOGGER.error("error caught", e);
                        }
                        list.addAll(arrayList);
                        list.removeAll(arrayList2);
                    });
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                }
            }
        };
    }

    private void releaseResource() {
        this.allWatchers.forEach((pathPattern, list) -> {
            list.forEach((v0) -> {
                v0.cancel();
            });
        });
        this.allWatchers.clear();
    }

    public void start() throws Exception {
        submitWorker(watchEventHandler());
    }

    public void register(String str) throws IOException {
        innerRegister(str, new PathPattern(str));
    }

    public void register(String str, String str2) throws IOException {
        innerRegister(str, new PathPattern(str, str2));
    }

    private void innerRegister(String str, PathPattern pathPattern) throws IOException {
        ArrayList arrayList = new ArrayList();
        if (this.allWatchers.putIfAbsent(pathPattern, arrayList) != null) {
            LOGGER.error("{} exists in watcher list, please check it", str);
            return;
        }
        Path path = Paths.get(pathPattern.getRootDir(), new String[0]);
        LOGGER.info("watch root path is {}", path);
        arrayList.add(path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE));
    }

    public void unregister(String str) {
        List<WatchKey> remove = this.allWatchers.remove(new PathPattern(str));
        if (remove != null) {
            LOGGER.info("unregister pattern {}, total size of path {}", str, Integer.valueOf(remove.size()));
            Iterator<WatchKey> it = remove.iterator();
            while (it.hasNext()) {
                it.next().cancel();
            }
        }
    }

    ConcurrentHashMap<PathPattern, List<WatchKey>> getAllWatchers() {
        return this.allWatchers;
    }

    public void init(TriggerProfile triggerProfile) throws IOException {
        initWatchService();
        this.interval = triggerProfile.getInt("trigger.check.interval", 2);
        this.profile = triggerProfile;
        if (this.profile.hasKey("job.dir.pattern")) {
            String str = this.profile.get("job.dir.pattern");
            String str2 = this.profile.get("job.timeOffset", "");
            if (str2.isEmpty()) {
                register(str);
            }
            register(str, str2);
        }
    }

    public void run() {
        try {
            start();
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }
}
