/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.JobException;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.FilePathFilter;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousFileMonitoringFunction<OUT>
extends RichSourceFunction<FileInputSplit>
implements Checkpointed<Long> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
    public static final long MIN_MONITORING_INTERVAL = 100L;
    private final String path;
    private final int readerParallelism;
    private FileInputFormat<OUT> format;
    private final long interval;
    private final FileProcessingMode watchType;
    private Long globalModificationTime;
    private FilePathFilter pathFilter;
    private transient Object checkpointLock;
    private volatile boolean isRunning = true;

    public ContinuousFileMonitoringFunction(FileInputFormat<OUT> format, String path, FilePathFilter filter, FileProcessingMode watchType, int readerParallelism, long interval) {
        if (watchType != FileProcessingMode.PROCESS_ONCE && interval < 100L) {
            throw new IllegalArgumentException("The specified monitoring interval (" + interval + " ms) is smaller than the minimum allowed one (100 ms).");
        }
        this.format = (FileInputFormat)Preconditions.checkNotNull(format, (String)"Unspecified File Input Format.");
        this.path = (String)Preconditions.checkNotNull((Object)path, (String)"Unspecified Path.");
        this.pathFilter = (FilePathFilter)Preconditions.checkNotNull((Object)filter, (String)"Unspecified File Path Filter.");
        this.interval = interval;
        this.watchType = watchType;
        this.readerParallelism = Math.max(readerParallelism, 1);
        this.globalModificationTime = Long.MIN_VALUE;
    }

    public void open(Configuration parameters) throws Exception {
        LOG.info("Opening File Monitoring Source.");
        super.open(parameters);
        this.format.configure(parameters);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
        FileSystem fileSystem = FileSystem.get((URI)new URI(this.path));
        this.checkpointLock = context.getCheckpointLock();
        switch (this.watchType) {
            case PROCESS_CONTINUOUSLY: {
                while (this.isRunning) {
                    Object object = this.checkpointLock;
                    synchronized (object) {
                        this.monitorDirAndForwardSplits(fileSystem, context);
                    }
                    Thread.sleep(this.interval);
                }
                break;
            }
            case PROCESS_ONCE: {
                Object object = this.checkpointLock;
                synchronized (object) {
                    this.monitorDirAndForwardSplits(fileSystem, context);
                    this.globalModificationTime = Long.MAX_VALUE;
                    this.isRunning = false;
                    break;
                }
            }
            default: {
                this.isRunning = false;
                throw new RuntimeException("Unknown WatchType" + (Object)((Object)this.watchType));
            }
        }
    }

    private void monitorDirAndForwardSplits(FileSystem fs, SourceFunction.SourceContext<FileInputSplit> context) throws IOException, JobException {
        assert (Thread.holdsLock(this.checkpointLock));
        List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime = this.getInputSplitSortedOnModTime(fs);
        Iterator<Tuple2<Long, List<FileInputSplit>>> it = splitsByModTime.iterator();
        while (it.hasNext()) {
            this.forwardSplits(it.next(), context);
            it.remove();
        }
    }

    private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceFunction.SourceContext<FileInputSplit> context) {
        assert (Thread.holdsLock(this.checkpointLock));
        Long modTime = (Long)splitsToFwd.f0;
        List splits = (List)splitsToFwd.f1;
        Iterator it = splits.iterator();
        while (it.hasNext()) {
            FileInputSplit split = (FileInputSplit)it.next();
            this.processSplit(split, context);
            it.remove();
        }
        if (modTime >= this.globalModificationTime) {
            this.globalModificationTime = modTime;
        }
    }

    private void processSplit(FileInputSplit split, SourceFunction.SourceContext<FileInputSplit> context) {
        LOG.info("Forwarding split: " + split);
        context.collect(split);
    }

    private List<Tuple2<Long, List<FileInputSplit>>> getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
        List<FileStatus> eligibleFiles = this.listEligibleFiles(fileSystem);
        if (eligibleFiles.isEmpty()) {
            return new ArrayList<Tuple2<Long, List<FileInputSplit>>>();
        }
        Map<Long, List<FileInputSplit>> splitsToForward = this.getInputSplits(eligibleFiles);
        ArrayList<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward = new ArrayList<Tuple2<Long, List<FileInputSplit>>>();
        for (Map.Entry<Long, List<FileInputSplit>> entry : splitsToForward.entrySet()) {
            sortedSplitsToForward.add((Tuple2<Long, List<FileInputSplit>>)new Tuple2((Object)entry.getKey(), entry.getValue()));
        }
        Collections.sort(sortedSplitsToForward, new Comparator<Tuple2<Long, List<FileInputSplit>>>(){

            @Override
            public int compare(Tuple2<Long, List<FileInputSplit>> o1, Tuple2<Long, List<FileInputSplit>> o2) {
                return (int)((Long)o1.f0 - (Long)o2.f0);
            }
        });
        return sortedSplitsToForward;
    }

    private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus> eligibleFiles) throws IOException {
        if (eligibleFiles.isEmpty()) {
            return new HashMap<Long, List<FileInputSplit>>();
        }
        FileInputSplit[] inputSplits = this.format.createInputSplits(this.readerParallelism);
        HashMap<Long, List<FileInputSplit>> splitsPerFile = new HashMap<Long, List<FileInputSplit>>();
        block0: for (FileInputSplit split : inputSplits) {
            for (FileStatus file : eligibleFiles) {
                if (!file.getPath().equals((Object)split.getPath())) continue;
                Long modTime = file.getModificationTime();
                LinkedList<FileInputSplit> splitsToForward = (LinkedList<FileInputSplit>)splitsPerFile.get(modTime);
                if (splitsToForward == null) {
                    splitsToForward = new LinkedList<FileInputSplit>();
                    splitsPerFile.put(modTime, splitsToForward);
                }
                splitsToForward.add(split);
                continue block0;
            }
        }
        return splitsPerFile;
    }

    private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
        FileStatus[] statuses;
        try {
            statuses = fileSystem.listStatus(new Path(this.path));
        }
        catch (IOException e) {
            return Collections.emptyList();
        }
        if (statuses == null) {
            LOG.warn("Path does not exist: {}", (Object)this.path);
            return Collections.emptyList();
        }
        ArrayList<FileStatus> files = new ArrayList<FileStatus>();
        for (FileStatus status : statuses) {
            long modificationTime;
            Path filePath = status.getPath();
            if (this.shouldIgnore(filePath, modificationTime = status.getModificationTime())) continue;
            files.add(status);
        }
        return files;
    }

    private boolean shouldIgnore(Path filePath, long modificationTime) {
        boolean shouldIgnore;
        assert (Thread.holdsLock(this.checkpointLock));
        boolean bl = shouldIgnore = this.pathFilter != null && this.pathFilter.filterPath(filePath) || modificationTime <= this.globalModificationTime;
        if (shouldIgnore) {
            LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + this.globalModificationTime);
        }
        return shouldIgnore;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws Exception {
        super.close();
        Object object = this.checkpointLock;
        synchronized (object) {
            this.globalModificationTime = Long.MAX_VALUE;
            this.isRunning = false;
        }
        LOG.info("Closed File Monitoring Source.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel() {
        if (this.checkpointLock != null) {
            Object object = this.checkpointLock;
            synchronized (object) {
                this.globalModificationTime = Long.MAX_VALUE;
                this.isRunning = false;
            }
        } else {
            this.globalModificationTime = Long.MAX_VALUE;
            this.isRunning = false;
        }
    }

    @Override
    public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        return this.globalModificationTime;
    }

    @Override
    public void restoreState(Long state) throws Exception {
        this.globalModificationTime = state;
    }
}

