package org.apache.flink.examples.java.distcp;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.util.DataSetDeprecationInfo;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/examples/java/distcp/DistCp.class */
public class DistCp {
    private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
    public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
    public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";

    public static void main(String[] strArr) throws Exception {
        LOGGER.warn(DataSetDeprecationInfo.DATASET_DEPRECATION_INFO);
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        if (!fromArgs.has("input") || !fromArgs.has("output")) {
            System.err.println("Usage: --input <path> --output <path> [--parallelism <n>]");
            return;
        }
        Path path = new Path(fromArgs.get("input"));
        final Path path2 = new Path(fromArgs.get("output"));
        if (!isLocal(executionEnvironment) && (!isOnDistributedFS(path) || !isOnDistributedFS(path2))) {
            System.out.println("In a distributed mode only HDFS input/output paths are supported");
            return;
        }
        int i = fromArgs.getInt("parallelism", 10);
        if (i <= 0) {
            System.err.println("Parallelism should be greater than 0");
            return;
        }
        executionEnvironment.getConfig().setGlobalJobParameters(fromArgs);
        executionEnvironment.setParallelism(i);
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.info("Initializing copy tasks");
        List<FileCopyTask> copyTasks = getCopyTasks(path);
        LOGGER.info("Copy task initialization took " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
        new DataSource(executionEnvironment, new FileCopyTaskInputFormat(copyTasks), new GenericTypeInfo(FileCopyTask.class), "fileCopyTasks").flatMap(new RichFlatMapFunction<FileCopyTask, Object>() { // from class: org.apache.flink.examples.java.distcp.DistCp.1
            private static final long serialVersionUID = 1109254230243989929L;
            private LongCounter fileCounter;
            private LongCounter bytesCounter;

            public void open(OpenContext openContext) throws Exception {
                this.bytesCounter = getRuntimeContext().getLongCounter(DistCp.BYTES_COPIED_CNT_NAME);
                this.fileCounter = getRuntimeContext().getLongCounter(DistCp.FILES_COPIED_CNT_NAME);
            }

            public void flatMap(FileCopyTask fileCopyTask, Collector<Object> collector) throws Exception {
                DistCp.LOGGER.info("Processing task: " + fileCopyTask);
                Path path3 = new Path(path2, fileCopyTask.getRelativePath());
                FileSystem fileSystem = path2.getFileSystem();
                if (!fileSystem.isDistributedFS()) {
                    File parentFile = (path3.toUri().isAbsolute() ? new File(path3.toUri()) : new File(path3.toString())).getParentFile();
                    if (!parentFile.mkdirs() && !parentFile.exists()) {
                        throw new RuntimeException("Cannot create local file system directories: " + parentFile);
                    }
                }
                OutputStream outputStream = null;
                InputStream inputStream = null;
                try {
                    outputStream = fileSystem.create(path3, FileSystem.WriteMode.OVERWRITE);
                    inputStream = fileCopyTask.getPath().getFileSystem().open(fileCopyTask.getPath());
                    this.bytesCounter.add(IOUtils.copy(inputStream, outputStream));
                    IOUtils.closeQuietly(inputStream);
                    IOUtils.closeQuietly(outputStream);
                    this.fileCounter.add(1L);
                } catch (Throwable th) {
                    IOUtils.closeQuietly(inputStream);
                    IOUtils.closeQuietly(outputStream);
                    throw th;
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((FileCopyTask) obj, (Collector<Object>) collector);
            }
        }).print();
        Map allAccumulatorResults = executionEnvironment.getLastJobExecutionResult().getAllAccumulatorResults();
        LOGGER.info("== COUNTERS ==");
        for (Map.Entry entry : allAccumulatorResults.entrySet()) {
            LOGGER.info(((String) entry.getKey()) + ": " + entry.getValue());
        }
    }

    private static boolean isLocal(ExecutionEnvironment executionEnvironment) {
        return executionEnvironment instanceof LocalEnvironment;
    }

    private static boolean isOnDistributedFS(Path path) throws IOException {
        return path.getFileSystem().isDistributedFS();
    }

    private static List<FileCopyTask> getCopyTasks(Path path) throws IOException {
        ArrayList arrayList = new ArrayList();
        getCopyTasks(path, "", arrayList);
        return arrayList;
    }

    private static void getCopyTasks(Path path, String str, List<FileCopyTask> list) throws IOException {
        FileStatus[] listStatus = path.getFileSystem().listStatus(path);
        if (listStatus == null) {
            return;
        }
        for (FileStatus fileStatus : listStatus) {
            if (fileStatus.isDir()) {
                getCopyTasks(fileStatus.getPath(), str + fileStatus.getPath().getName() + "/", list);
            } else {
                Path path2 = fileStatus.getPath();
                list.add(new FileCopyTask(path2, str + path2.getName()));
            }
        }
    }
}
