package org.apache.gobblin.data.management.copy;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.gobblin.data.management.policy.SelectAfterTimeBasedPolicy;
import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
import org.apache.gobblin.data.management.version.finder.DateTimeDatasetVersionFinder;
import org.apache.gobblin.data.management.version.finder.VersionFinder;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/data/management/copy/TimestampBasedCopyableDataset.class */
public class TimestampBasedCopyableDataset implements CopyableDataset, FileSystemDataset {
    private final Path datasetRoot;
    private final VersionFinder<TimestampedDatasetVersion> datasetVersionFinder;
    private final VersionSelectionPolicy<TimestampedDatasetVersion> versionSelectionPolicy;
    private final ExecutorService executor;
    private final FileSystem srcFs;
    public static final String DATASET_VERSION_FINDER = "timestamp.based.copyable.dataset.version.finder";
    public static final String COPY_POLICY = "timestamp.based.copyable.dataset.copy.policy";
    public static final String THREADPOOL_SIZE_TO_GET_COPYABLE_FILES = "threadpool.size.to.get.copyable.files";
    public static final String DEFAULT_THREADPOOL_SIZE_TO_GET_COPYABLE_FILES = "20";
    private static final Logger log = LoggerFactory.getLogger(TimestampBasedCopyableDataset.class);
    public static final String DEFAULT_DATASET_VERSION_FINDER = DateTimeDatasetVersionFinder.class.getName();
    public static final String DEFAULT_COPY_POLICY = SelectAfterTimeBasedPolicy.class.getName();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/gobblin/data/management/copy/TimestampBasedCopyableDataset$CopyableFileGenerator.class */
    public static class CopyableFileGenerator implements Runnable {
        private final FileSystem srcFs;
        private final FileSystem targetFs;
        private final CopyConfiguration configuration;
        private final Path datasetRoot;
        private final Path targetRoot;
        private final DateTime versionDatetime;
        private final Collection<Path> locationsToCopy;
        private final ConcurrentLinkedQueue<CopyableFile> copyableFileList;
        private final PathFilter filter;

        @Override // java.lang.Runnable
        public void run() {
            for (Path path : this.locationsToCopy) {
                long millis = this.versionDatetime.getMillis();
                try {
                    for (FileStatus fileStatus : this.srcFs.listStatus(path, this.filter)) {
                        Path path2 = fileStatus.getPath();
                        TimestampBasedCopyableDataset.log.debug("Checking if it is a copyable file: " + path2);
                        Path path3 = new Path(this.targetRoot, PathUtils.relativizePath(path2, this.datasetRoot));
                        if (isCopyableFile(fileStatus, path3)) {
                            TimestampBasedCopyableDataset.log.debug("Will create workunit for: " + path2);
                            this.copyableFileList.add(generateCopyableFile(fileStatus, path3, millis, path));
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException("Failed to get copyable files for " + path, e);
                }
            }
        }

        @VisibleForTesting
        protected CopyableFile generateCopyableFile(FileStatus fileStatus, Path path, long j, Path path2) throws IOException {
            return CopyableFile.fromOriginAndDestination(this.srcFs, fileStatus, path, this.configuration).originTimestamp(j).upstreamTimestamp(j).fileSet(PathUtils.getPathWithoutSchemeAndAuthority(path2).toString()).build();
        }

        private boolean isCopyableFile(FileStatus fileStatus, Path path) throws IOException {
            return !this.targetFs.exists(path) || fileStatus.getModificationTime() > this.targetFs.getFileStatus(path).getModificationTime();
        }

        @ConstructorProperties({"srcFs", "targetFs", "configuration", "datasetRoot", "targetRoot", "versionDatetime", "locationsToCopy", "copyableFileList", "filter"})
        public CopyableFileGenerator(FileSystem fileSystem, FileSystem fileSystem2, CopyConfiguration copyConfiguration, Path path, Path path2, DateTime dateTime, Collection<Path> collection, ConcurrentLinkedQueue<CopyableFile> concurrentLinkedQueue, PathFilter pathFilter) {
            this.srcFs = fileSystem;
            this.targetFs = fileSystem2;
            this.configuration = copyConfiguration;
            this.datasetRoot = path;
            this.targetRoot = path2;
            this.versionDatetime = dateTime;
            this.locationsToCopy = collection;
            this.copyableFileList = concurrentLinkedQueue;
            this.filter = pathFilter;
        }
    }

    public TimestampBasedCopyableDataset(FileSystem fileSystem, Properties properties, Path path) {
        this.srcFs = fileSystem;
        this.datasetRoot = path;
        try {
            this.versionSelectionPolicy = (VersionSelectionPolicy) Class.forName(properties.getProperty(COPY_POLICY, DEFAULT_COPY_POLICY)).getConstructor(Properties.class).newInstance(properties);
            this.datasetVersionFinder = (VersionFinder) Class.forName(properties.getProperty(DATASET_VERSION_FINDER, DEFAULT_DATASET_VERSION_FINDER)).getConstructor(FileSystem.class, Properties.class).newInstance(this.srcFs, properties);
            this.executor = ScalingThreadPoolExecutor.newScalingThreadPool(0, Integer.parseInt(properties.getProperty(THREADPOOL_SIZE_TO_GET_COPYABLE_FILES, DEFAULT_THREADPOOL_SIZE_TO_GET_COPYABLE_FILES)), 100L, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of(getClass().getSimpleName())));
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.gobblin.data.management.copy.CopyableDataset
    public Collection<CopyableFile> getCopyableFiles(FileSystem fileSystem, CopyConfiguration copyConfiguration) throws IOException {
        log.info(String.format("Getting copyable files at root path: %s", this.datasetRoot));
        ArrayList newArrayList = Lists.newArrayList(this.datasetVersionFinder.findDatasetVersions(this));
        if (newArrayList.isEmpty()) {
            log.warn("No dataset version can be found. Ignoring.");
            return Lists.newArrayList();
        }
        Collection<TimestampedDatasetVersion> listSelectedVersions = this.versionSelectionPolicy.listSelectedVersions(newArrayList);
        ConcurrentLinkedQueue<CopyableFile> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator<TimestampedDatasetVersion> it = listSelectedVersions.iterator();
        while (it.hasNext()) {
            try {
                newArrayList2.add(this.executor.submit(getCopyableFileGenetator(fileSystem, copyConfiguration, it.next(), concurrentLinkedQueue)));
            } finally {
                ExecutorsUtils.shutdownExecutorService(this.executor, Optional.of(log));
            }
        }
        try {
            Iterator it2 = newArrayList2.iterator();
            while (it2.hasNext()) {
                ((Future) it2.next()).get();
            }
            return concurrentLinkedQueue;
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Failed to generate copyable files.", e);
        }
    }

    @VisibleForTesting
    protected CopyableFileGenerator getCopyableFileGenetator(FileSystem fileSystem, CopyConfiguration copyConfiguration, TimestampedDatasetVersion timestampedDatasetVersion, ConcurrentLinkedQueue<CopyableFile> concurrentLinkedQueue) {
        return new CopyableFileGenerator(this.srcFs, fileSystem, copyConfiguration, this.datasetRoot, getTargetRoot(copyConfiguration.getPublishDir()), timestampedDatasetVersion.getDateTime(), timestampedDatasetVersion.getPaths(), concurrentLinkedQueue, copyableFileFilter());
    }

    protected PathFilter copyableFileFilter() {
        return new HiddenFilter();
    }

    protected Path getTargetRoot(Path path) {
        return new Path(path, this.datasetRoot.getName());
    }

    public String datasetURN() {
        return datasetRoot().toString();
    }

    public Path datasetRoot() {
        return this.datasetRoot;
    }

    public Path getDatasetRoot() {
        return this.datasetRoot;
    }

    public VersionFinder<TimestampedDatasetVersion> getDatasetVersionFinder() {
        return this.datasetVersionFinder;
    }

    public VersionSelectionPolicy<TimestampedDatasetVersion> getVersionSelectionPolicy() {
        return this.versionSelectionPolicy;
    }

    public ExecutorService getExecutor() {
        return this.executor;
    }

    public FileSystem getSrcFs() {
        return this.srcFs;
    }
}
