package org.apache.gobblin.data.management.copy.replication;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.source.extractor.ComparableWatermark;
import org.apache.gobblin.source.extractor.Watermark;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.class */
public class ConfigBasedDataset implements CopyableDataset {
    private static final Logger log = LoggerFactory.getLogger(ConfigBasedDataset.class);
    private final Properties props;
    private final CopyRoute copyRoute;
    private final ReplicationConfiguration rc;
    private String datasetURN;
    private boolean watermarkEnabled;
    private final PathFilter pathFilter;
    private final boolean applyFilterToDirectories;

    public ConfigBasedDataset(ReplicationConfiguration replicationConfiguration, Properties properties, CopyRoute copyRoute) {
        this.props = properties;
        this.copyRoute = copyRoute;
        this.rc = replicationConfiguration;
        calculateDatasetURN();
        this.watermarkEnabled = Boolean.parseBoolean(this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true"));
        this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
        this.applyFilterToDirectories = Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
    }

    public ConfigBasedDataset(ReplicationConfiguration replicationConfiguration, Properties properties, CopyRoute copyRoute, String str) {
        this.props = properties;
        this.copyRoute = copyRoute;
        this.rc = replicationConfiguration;
        this.datasetURN = str;
        this.pathFilter = DatasetUtils.instantiatePathFilter(this.props);
        this.applyFilterToDirectories = Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false"));
    }

    private void calculateDatasetURN() {
        EndPoint copyTo = this.copyRoute.getCopyTo();
        if (!(copyTo instanceof HadoopFsEndPoint)) {
            this.datasetURN = copyTo.toString();
            return;
        }
        HadoopFsEndPoint hadoopFsEndPoint = (HadoopFsEndPoint) copyTo;
        try {
            this.datasetURN = FileSystem.get(hadoopFsEndPoint.getFsURI(), HadoopUtils.newConfiguration()).makeQualified(hadoopFsEndPoint.getDatasetPath()).toString();
        } catch (IOException e) {
        }
    }

    public String datasetURN() {
        return this.datasetURN;
    }

    @Override // org.apache.gobblin.data.management.copy.CopyableDataset
    public Collection<? extends CopyEntity> getCopyableFiles(FileSystem fileSystem, CopyConfiguration copyConfiguration) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        EndPoint copyFrom = this.copyRoute.getCopyFrom();
        EndPoint copyTo = this.copyRoute.getCopyTo();
        if (!(copyFrom instanceof HadoopFsEndPoint) || !(copyTo instanceof HadoopFsEndPoint)) {
            log.warn("Currently only handle the Hadoop Fs EndPoint replication");
            return newArrayList;
        }
        HadoopFsEndPoint hadoopFsEndPoint = (HadoopFsEndPoint) copyFrom;
        HadoopFsEndPoint hadoopFsEndPoint2 = (HadoopFsEndPoint) copyTo;
        hadoopFsEndPoint.setPathFilter(this.pathFilter);
        hadoopFsEndPoint.setApplyFilterToDirectories(this.applyFilterToDirectories);
        hadoopFsEndPoint2.setPathFilter(this.pathFilter);
        hadoopFsEndPoint2.setApplyFilterToDirectories(this.applyFilterToDirectories);
        if (this.watermarkEnabled && ((!copyFrom.getWatermark().isPresent() && copyTo.getWatermark().isPresent()) || (copyFrom.getWatermark().isPresent() && copyTo.getWatermark().isPresent() && ((ComparableWatermark) copyFrom.getWatermark().get()).compareTo(copyTo.getWatermark().get()) <= 0))) {
            log.info("No need to copy as destination watermark >= source watermark with source watermark {}, for dataset with metadata {}", copyFrom.getWatermark().isPresent() ? ((ComparableWatermark) copyFrom.getWatermark().get()).toJson() : "N/A", this.rc.getMetaData());
            return newArrayList;
        }
        Configuration newConfiguration = HadoopUtils.newConfiguration();
        FileSystem fileSystem2 = FileSystem.get(hadoopFsEndPoint.getFsURI(), newConfiguration);
        FileSystem fileSystem3 = FileSystem.get(hadoopFsEndPoint2.getFsURI(), newConfiguration);
        Collection<FileStatus> files = hadoopFsEndPoint.getFiles();
        Collection<FileStatus> files2 = hadoopFsEndPoint2.getFiles();
        HashSet<FileStatus> newHashSet = Sets.newHashSet(files);
        HashMap newHashMap = Maps.newHashMap();
        for (FileStatus fileStatus : files2) {
            newHashMap.put(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()), fileStatus);
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        boolean z = false;
        boolean isDeleteTargetIfNotExistOnSource = this.rc.isDeleteTargetIfNotExistOnSource();
        for (FileStatus fileStatus2 : newHashSet) {
            Path relativizePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus2.getPath()), PathUtils.getPathWithoutSchemeAndAuthority(hadoopFsEndPoint.getDatasetPath()));
            Path path = new Path(hadoopFsEndPoint2.getDatasetPath(), relativizePath);
            if (relativizePath.toString().equals(ReplicaHadoopFsEndPoint.WATERMARK_FILE)) {
                z = true;
            }
            if (newHashMap.containsKey(path) && ((FileStatus) newHashMap.get(path)).getLen() == fileStatus2.getLen() && ((FileStatus) newHashMap.get(path)).getModificationTime() > fileStatus2.getModificationTime()) {
                log.debug("Copy from timestamp older than copy to timestamp, skipped copy {} for dataset with metadata {}", fileStatus2.getPath(), this.rc.getMetaData());
            } else {
                if (newHashMap.containsKey(path)) {
                    newArrayList2.add(path);
                }
                CopyableFile build = CopyableFile.fromOriginAndDestination(fileSystem2, fileStatus2, fileSystem3.makeQualified(path), copyConfiguration).fileSet(PathUtils.getPathWithoutSchemeAndAuthority(hadoopFsEndPoint2.getDatasetPath()).toString()).build();
                build.setFsDatasets(fileSystem2, fileSystem3);
                newArrayList.add(build);
            }
            newHashMap.remove(path);
        }
        if (isDeleteTargetIfNotExistOnSource) {
            newArrayList2.addAll(newHashMap.keySet());
        }
        if (!newArrayList2.isEmpty()) {
            newArrayList.add(new PrePublishStep(hadoopFsEndPoint2.getDatasetPath().toString(), Maps.newHashMap(), DeleteFileCommitStep.fromPaths(fileSystem3, newArrayList2, this.props), 0));
        }
        if (!z && hadoopFsEndPoint.getWatermark().isPresent()) {
            newArrayList.add(new PostPublishStep(hadoopFsEndPoint2.getDatasetPath().toString(), Maps.newHashMap(), new WatermarkMetadataGenerationCommitStep(hadoopFsEndPoint2.getFsURI().toString(), hadoopFsEndPoint2.getDatasetPath(), (Watermark) hadoopFsEndPoint.getWatermark().get()), 1));
        }
        return newArrayList;
    }
}
