package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.class */
public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
    private String path;
    private long interval;
    private WatchType watchType;
    private volatile boolean isRunning = true;
    private Map<String, Long> modificationTimes = new HashMap();
    private Map<String, Long> offsetOfFiles = new HashMap();

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/FileMonitoringFunction$WatchType.class */
    public enum WatchType {
        ONLY_NEW_FILES,
        REPROCESS_WITH_APPENDED,
        PROCESS_ONLY_APPENDED
    }

    public FileMonitoringFunction(String str, long j, WatchType watchType) {
        this.path = str;
        this.interval = j;
        this.watchType = watchType;
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<Tuple3<String, Long, Long>> sourceContext) throws Exception {
        FileSystem fileSystem = FileSystem.get(new URI(this.path));
        while (this.isRunning) {
            for (String str : listNewFiles(fileSystem)) {
                if (this.watchType == WatchType.ONLY_NEW_FILES || this.watchType == WatchType.REPROCESS_WITH_APPENDED) {
                    sourceContext.collect(new Tuple3<>(str, 0L, -1L));
                    this.offsetOfFiles.put(str, -1L);
                } else if (this.watchType == WatchType.PROCESS_ONLY_APPENDED) {
                    long len = fileSystem.getFileStatus(new Path(str)).getLen();
                    long longValue = this.offsetOfFiles.containsKey(str) ? this.offsetOfFiles.get(str).longValue() : 0L;
                    sourceContext.collect(new Tuple3<>(str, Long.valueOf(longValue), Long.valueOf(len)));
                    this.offsetOfFiles.put(str, Long.valueOf(len));
                    LOG.info("File processed: {}, {}, {}", str, Long.valueOf(longValue), Long.valueOf(len));
                }
            }
            Thread.sleep(this.interval);
        }
    }

    private List<String> listNewFiles(FileSystem fileSystem) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (FileStatus fileStatus : fileSystem.listStatus(new Path(this.path))) {
            Path path = fileStatus.getPath();
            String name = path.getName();
            long modificationTime = fileStatus.getModificationTime();
            if (!isFiltered(name, modificationTime)) {
                arrayList.add(path.toString());
                this.modificationTimes.put(name, Long.valueOf(modificationTime));
            }
        }
        return arrayList;
    }

    private boolean isFiltered(String str, long j) {
        if ((this.watchType == WatchType.ONLY_NEW_FILES && this.modificationTimes.containsKey(str)) || str.startsWith(".") || str.contains("_COPYING_")) {
            return true;
        }
        Long l = this.modificationTimes.get(str);
        return l != null && l.longValue() >= j;
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        this.isRunning = false;
    }
}
