package org.apache.hudi.utilities.sources.helpers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.org.apache.hadoop.hbase.ServerName;
import org.apache.hudi.org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/DFSPathSelector.class */
public class DFSPathSelector {
    protected static volatile Logger log = LogManager.getLogger(DFSPathSelector.class);
    protected static final List<String> IGNORE_FILEPREFIX_LIST = Arrays.asList(DefaultWALProvider.WAL_FILE_NAME_DELIMITER, "_");
    protected final transient FileSystem fs;
    protected final TypedProperties props;

    /* loaded from: input_file:org/apache/hudi/utilities/sources/helpers/DFSPathSelector$Config.class */
    public static class Config {
        public static final String ROOT_INPUT_PATH_PROP = "hoodie.deltastreamer.source.dfs.root";
    }

    public DFSPathSelector(TypedProperties typedProperties, Configuration configuration) {
        DataSourceUtils.checkRequiredProperties(typedProperties, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
        this.props = typedProperties;
        this.fs = FSUtils.getFs(typedProperties.getString(Config.ROOT_INPUT_PATH_PROP), configuration);
    }

    public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Option<String> option, long j) {
        try {
            log.info("Root path => " + this.props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + j);
            ArrayList<FileStatus> arrayList = new ArrayList();
            RemoteIterator listFiles = this.fs.listFiles(new Path(this.props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
            while (listFiles.hasNext()) {
                LocatedFileStatus locatedFileStatus = (LocatedFileStatus) listFiles.next();
                if (!locatedFileStatus.isDirectory() && !IGNORE_FILEPREFIX_LIST.stream().anyMatch(str -> {
                    return locatedFileStatus.getPath().getName().startsWith(str);
                })) {
                    arrayList.add(locatedFileStatus);
                }
            }
            arrayList.sort(Comparator.comparingLong((v0) -> {
                return v0.getModificationTime();
            }));
            long j2 = 0;
            long j3 = Long.MIN_VALUE;
            ArrayList arrayList2 = new ArrayList();
            for (FileStatus fileStatus : arrayList) {
                if (!option.isPresent() || fileStatus.getModificationTime() > Long.valueOf(option.get()).longValue()) {
                    if (j2 + fileStatus.getLen() >= j) {
                        break;
                    }
                    j3 = fileStatus.getModificationTime();
                    j2 += fileStatus.getLen();
                    arrayList2.add(fileStatus);
                }
            }
            return arrayList2.size() == 0 ? new ImmutablePair(Option.empty(), option.orElseGet(() -> {
                return String.valueOf(Long.MIN_VALUE);
            })) : new ImmutablePair(Option.ofNullable((String) arrayList2.stream().map(fileStatus2 -> {
                return fileStatus2.getPath().toString();
            }).collect(Collectors.joining(ServerName.SERVERNAME_SEPARATOR))), String.valueOf(j3));
        } catch (IOException e) {
            throw new HoodieIOException("Unable to read from source from checkpoint: " + option, e);
        }
    }
}
