package co.cask.cdap.internal.app.runtime.batch.dataset.partitioned;

import co.cask.cdap.api.dataset.lib.PartitionKey;
import co.cask.cdap.api.dataset.lib.PartitionedFileSet;
import co.cask.cdap.api.dataset.lib.Partitioning;
import co.cask.cdap.internal.app.runtime.batch.BasicMapReduceTaskContext;
import co.cask.cdap.internal.app.runtime.batch.MapReduceClassLoader;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/dataset/partitioned/DynamicPartitioningOutputCommitter.class */
public class DynamicPartitioningOutputCommitter extends FileOutputCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitioningOutputCommitter.class);
    private final TaskAttemptContext taskContext;
    private final Path jobSpecificOutputPath;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/dataset/partitioned/DynamicPartitioningOutputCommitter$CommittedTaskFilter.class */
    public static class CommittedTaskFilter implements PathFilter {
        private CommittedTaskFilter() {
        }

        public boolean accept(Path path) {
            return !"_temporary".equals(path.getName());
        }
    }

    public DynamicPartitioningOutputCommitter(Path path, TaskAttemptContext taskAttemptContext) throws IOException {
        super(path, taskAttemptContext);
        this.taskContext = taskAttemptContext;
        this.jobSpecificOutputPath = path;
    }

    public void commitJob(JobContext jobContext) throws IOException {
        Configuration configuration = jobContext.getConfiguration();
        BasicMapReduceTaskContext basicMapReduceTaskContext = MapReduceClassLoader.getFromConfiguration(configuration).getTaskContextProvider().get(this.taskContext);
        PartitionedFileSet dataset = basicMapReduceTaskContext.getDataset(configuration.get("output.dataset.name"));
        Partitioning partitioning = dataset.getPartitioning();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (FileStatus fileStatus : getAllCommittedTaskPaths(jobContext)) {
            FileSystem fileSystem = fileStatus.getPath().getFileSystem(configuration);
            RemoteIterator listFiles = fileSystem.listFiles(fileStatus.getPath(), true);
            while (listFiles.hasNext()) {
                Path path = ((LocatedFileStatus) listFiles.next()).getPath();
                String relative = getRelative(fileStatus.getPath(), path);
                int lastIndexOf = relative.lastIndexOf("/");
                if (lastIndexOf == -1) {
                    LOG.warn("Skipping path '{}'. It's relative path '{}' has fewer than two parts", path, relative);
                } else {
                    String substring = relative.substring(0, lastIndexOf);
                    Path path2 = new Path(new Path(FileOutputFormat.getOutputPath(jobContext), substring), relative.substring(lastIndexOf + 1));
                    if (fileSystem.exists(path2)) {
                        throw new FileAlreadyExistsException("Final output path " + path2 + " already exists");
                    }
                    hashSet.add(getPartitionKey(partitioning, substring));
                    hashSet2.add(substring);
                }
            }
        }
        Path outputPath = FileOutputFormat.getOutputPath(jobContext);
        FileSystem fileSystem2 = outputPath.getFileSystem(configuration);
        for (FileStatus fileStatus2 : getAllCommittedTaskPaths(jobContext)) {
            mergePaths(fileSystem2, fileStatus2, outputPath);
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            dataset.getPartitionOutput((PartitionKey) it.next()).addPartition();
        }
        try {
            basicMapReduceTaskContext.flushOperations();
            cleanupJob(jobContext);
            if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
                Iterator it2 = hashSet2.iterator();
                while (it2.hasNext()) {
                    fileSystem2.createNewFile(new Path(new Path(outputPath, (String) it2.next()), "_SUCCESS"));
                }
            }
        } catch (Exception e) {
            Throwables.propagateIfPossible(e, IOException.class);
            throw new IOException(e);
        }
    }

    private PartitionKey getPartitionKey(Partitioning partitioning, String str) {
        List asList = Arrays.asList(str.split("/"));
        if (asList.size() != partitioning.getFields().size()) {
            throw new IllegalArgumentException(String.format("relativePath '%s' does not have same number of components as partitioning '%s", str, partitioning));
        }
        PartitionKey.Builder builder = PartitionKey.builder();
        int i = 0;
        for (Map.Entry entry : partitioning.getFields().entrySet()) {
            builder.addField((String) entry.getKey(), ((Partitioning.FieldType) entry.getValue()).parse((String) asList.get(i)));
            i++;
        }
        return builder.build();
    }

    public void cleanupJob(JobContext jobContext) throws IOException {
        this.jobSpecificOutputPath.getFileSystem(jobContext.getConfiguration()).delete(this.jobSpecificOutputPath, true);
    }

    private void mergePaths(FileSystem fileSystem, FileStatus fileStatus, Path path) throws IOException {
        if (fileStatus.isFile()) {
            if (fileSystem.exists(path) && !fileSystem.delete(path, true)) {
                throw new IOException("Failed to delete " + path);
            }
            if (!fileSystem.rename(fileStatus.getPath(), path)) {
                throw new IOException("Failed to rename " + fileStatus + " to " + path);
            }
            return;
        }
        if (fileStatus.isDirectory()) {
            if (!fileSystem.exists(path)) {
                if (!fileSystem.rename(fileStatus.getPath(), path)) {
                    throw new IOException("Failed to rename " + fileStatus + " to " + path);
                }
                return;
            }
            if (!fileSystem.getFileStatus(path).isDirectory()) {
                if (!fileSystem.delete(path, true)) {
                    throw new IOException("Failed to delete " + path);
                }
                if (!fileSystem.rename(fileStatus.getPath(), path)) {
                    throw new IOException("Failed to rename " + fileStatus + " to " + path);
                }
                return;
            }
            for (FileStatus fileStatus2 : fileSystem.listStatus(fileStatus.getPath())) {
                mergePaths(fileSystem, fileStatus2, new Path(path, fileStatus2.getPath().getName()));
            }
        }
    }

    private FileStatus[] getAllCommittedTaskPaths(JobContext jobContext) throws IOException {
        Path jobAttemptPath = getJobAttemptPath(jobContext);
        return jobAttemptPath.getFileSystem(jobContext.getConfiguration()).listStatus(jobAttemptPath, new CommittedTaskFilter());
    }

    private String getRelative(Path path, Path path2) {
        return path.toUri().relativize(path2.toUri()).getPath();
    }
}
