package org.apache.falcon.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.job.JobCounters;
import org.apache.falcon.job.JobCountersHandler;
import org.apache.falcon.job.JobType;
import org.apache.falcon.util.ReplicationDistCpOption;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-distcp-replication-0.9.jar:org/apache/falcon/replication/FeedReplicator.class */
public class FeedReplicator extends Configured implements Tool {
    private static final Logger LOG;
    private static final String IGNORE = "IGNORE";
    static final /* synthetic */ boolean $assertionsDisabled;

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new Configuration(), new FeedReplicator(), strArr);
    }

    public int run(String[] strArr) throws Exception {
        CommandLine command = getCommand(strArr);
        DistCpOptions distCpOptions = getDistCpOptions(command);
        Configuration conf = getConf();
        Path path = new Path("file:///" + System.getProperty("oozie.action.conf.xml"));
        LOG.info("{} found conf ? {}", path, Boolean.valueOf(path.getFileSystem(conf).exists(path)));
        conf.addResource(path);
        String str = conf.get("falcon.include.path");
        boolean z = (str == null || "IGNORE".equalsIgnoreCase(str)) ? false : true;
        String optionValue = command.getOptionValue("availabilityFlag");
        if (StringUtils.isEmpty(optionValue)) {
            optionValue = ClusterHelper.NO_USER_BROKER_URL;
        }
        String str2 = "_SUCCESS";
        if (command.getOptionValue("falconFeedStorageType").equals(Storage.TYPE.FILESYSTEM.name())) {
            str2 = ClusterHelper.NO_USER_BROKER_URL.equals(optionValue) ? str2 : optionValue;
        }
        conf.set("falcon.feed.availability.flag", str2);
        DistCp customReplicator = z ? new CustomReplicator(conf, distCpOptions) : new DistCp(conf, distCpOptions);
        LOG.info("Started DistCp");
        Job execute = customReplicator.execute();
        if (command.hasOption("counterLogDir") && execute.getStatus().getState() == JobStatus.State.SUCCEEDED) {
            LOG.info("Gathering counters for the the Feed Replication job");
            Path path2 = new Path(command.getOptionValue("counterLogDir"), "counter.txt");
            JobCounters countersType = JobCountersHandler.getCountersType(JobType.FSREPLICATION.name());
            if (countersType != null) {
                countersType.obtainJobCounters(conf, execute, true);
                countersType.storeJobCounters(conf, path2);
            }
        }
        if (z) {
            executePostProcessing(conf, distCpOptions);
        }
        LOG.info("Completed DistCp");
        return 0;
    }

    protected CommandLine getCommand(String[] strArr) throws ParseException {
        Options options = new Options();
        Option option = new Option("maxMaps", true, "max number of maps to use for this copy");
        option.setRequired(true);
        options.addOption(option);
        Option option2 = new Option("mapBandwidth", true, "bandwidth per map (in MB) to use for this copy");
        option2.setRequired(true);
        options.addOption(option2);
        Option option3 = new Option("sourcePaths", true, "comma separtated list of source paths to be copied");
        option3.setRequired(true);
        options.addOption(option3);
        Option option4 = new Option("targetPath", true, "target path");
        option4.setRequired(true);
        options.addOption(option4);
        Option option5 = new Option("falconFeedStorageType", true, "feed storage type");
        option5.setRequired(true);
        options.addOption(option5);
        Option option6 = new Option("availabilityFlag", true, "availability flag");
        option6.setRequired(false);
        options.addOption(option6);
        Option option7 = new Option(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName(), true, "option to force overwrite");
        option7.setRequired(false);
        options.addOption(option7);
        Option option8 = new Option(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName(), true, "abort on error");
        option8.setRequired(false);
        options.addOption(option8);
        Option option9 = new Option(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName(), true, "skip checksums");
        option9.setRequired(false);
        options.addOption(option9);
        Option option10 = new Option(ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), true, "remove deleted files - should there be files in the target directory thatwere removed from the source directory");
        option10.setRequired(false);
        options.addOption(option10);
        Option option11 = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName(), true, "preserve block size");
        option11.setRequired(false);
        options.addOption(option11);
        Option option12 = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName(), true, "preserve replication count");
        option12.setRequired(false);
        options.addOption(option12);
        Option option13 = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName(), true, "preserve permissions");
        option13.setRequired(false);
        options.addOption(option13);
        Option option14 = new Option("counterLogDir", true, "log directory to store job counter file");
        option14.setRequired(false);
        options.addOption(option14);
        return new GnuParser().parse(options, strArr);
    }

    protected DistCpOptions getDistCpOptions(CommandLine commandLine) {
        DistCpOptions distCpOptions = new DistCpOptions(getPaths(commandLine.getOptionValue("sourcePaths").trim().split(",")), new Path(commandLine.getOptionValue("targetPath").trim()));
        distCpOptions.setBlocking(true);
        distCpOptions.setMaxMaps(Integer.parseInt(commandLine.getOptionValue("maxMaps")));
        distCpOptions.setMapBandwidth(Integer.parseInt(commandLine.getOptionValue("mapBandwidth")));
        String optionValue = commandLine.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName());
        if (StringUtils.isNotEmpty(optionValue) && optionValue.equalsIgnoreCase(Boolean.TRUE.toString())) {
            distCpOptions.setOverwrite(Boolean.parseBoolean(optionValue));
        } else {
            distCpOptions.setSyncFolder(true);
        }
        String optionValue2 = commandLine.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName());
        if (StringUtils.isNotEmpty(optionValue2)) {
            distCpOptions.setIgnoreFailures(Boolean.parseBoolean(optionValue2));
        }
        String optionValue3 = commandLine.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName());
        if (StringUtils.isNotEmpty(optionValue3)) {
            distCpOptions.setSkipCRC(Boolean.parseBoolean(optionValue3));
        }
        String optionValue4 = commandLine.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName());
        if (StringUtils.isNotEmpty(optionValue4)) {
            distCpOptions.setDeleteMissing(Boolean.parseBoolean(optionValue4));
        }
        String optionValue5 = commandLine.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName());
        if (optionValue5 != null && Boolean.parseBoolean(optionValue5)) {
            distCpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
        }
        String optionValue6 = commandLine.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName());
        if (optionValue6 != null && Boolean.parseBoolean(optionValue6)) {
            distCpOptions.preserve(DistCpOptions.FileAttribute.REPLICATION);
        }
        String optionValue7 = commandLine.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName());
        if (optionValue7 != null && Boolean.parseBoolean(optionValue7)) {
            distCpOptions.preserve(DistCpOptions.FileAttribute.PERMISSION);
        }
        return distCpOptions;
    }

    private List<Path> getPaths(String[] strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            arrayList.add(new Path(str));
        }
        return arrayList;
    }

    private void executePostProcessing(Configuration configuration, DistCpOptions distCpOptions) throws IOException, FalconException {
        Path targetPath = distCpOptions.getTargetPath();
        FileSystem createProxiedFileSystem = HadoopClientFactory.get().createProxiedFileSystem(targetPath.toUri(), getConf());
        List sourcePaths = distCpOptions.getSourcePaths();
        if (!$assertionsDisabled && sourcePaths.size() != 1) {
            throw new AssertionError("Source paths more than 1 can't be handled");
        }
        Path path = (Path) sourcePaths.get(0);
        Path path2 = new Path(getConf().get("falcon.include.path"));
        if (!$assertionsDisabled && !path2.toString().substring(0, path.toString().length()).equals(path.toString())) {
            throw new AssertionError("Source path is not a subset of include path");
        }
        String stripStart = StringUtils.stripStart(getFixedPath(path2.toString().substring(path.toString().length())), "/");
        Path path3 = StringUtils.isNotEmpty(stripStart) ? new Path(targetPath, stripStart) : targetPath;
        String str = configuration.get("falcon.feed.availability.flag");
        FileStatus[] globStatus = createProxiedFileSystem.globStatus(path3);
        if (globStatus == null) {
            createProxiedFileSystem.create(new Path(path3, str)).close();
            LOG.info("No files present in path: {}", path3);
            return;
        }
        for (FileStatus fileStatus : globStatus) {
            createProxiedFileSystem.create(new Path(fileStatus.getPath(), str)).close();
            LOG.info("Created {}", new Path(fileStatus.getPath(), str));
        }
    }

    private String getFixedPath(String str) throws IOException {
        String[] split = str.split("/");
        int length = split.length - 1;
        int length2 = split.length - 1;
        while (true) {
            if (length2 < 0) {
                break;
            }
            String str2 = split[length2];
            if (!str2.isEmpty() && FilteredCopyListing.getRegEx(str2).toString().equals("(" + str2 + "/)|(" + str2 + "$)")) {
                length = length2;
                break;
            }
            length2--;
        }
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i <= length; i++) {
            sb.append(split[i]).append("/");
        }
        String sb2 = sb.toString();
        return sb2.substring(0, sb2.lastIndexOf(47));
    }

    static {
        $assertionsDisabled = !FeedReplicator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(FeedReplicator.class);
    }
}
