package gobblin.runtime.mapreduce;

import com.google.common.io.Closer;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.WorkUnitState;
import gobblin.runtime.AbstractJobLauncher;
import gobblin.runtime.GobblinMultiTaskAttempt;
import gobblin.source.workunit.MultiWorkUnit;
import gobblin.source.workunit.WorkUnit;
import gobblin.util.JobLauncherUtils;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/mapreduce/GobblinOutputCommitter.class */
public class GobblinOutputCommitter extends OutputCommitter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) GobblinOutputFormat.class);
    private Map<String, GobblinMultiTaskAttempt> attemptIdToMultiTaskAttempt = new ConcurrentHashMap();

    /* loaded from: input_file:WEB-INF/lib/gobblin-runtime-0.11.0.jar:gobblin/runtime/mapreduce/GobblinOutputCommitter$WorkUnitFilter.class */
    private static class WorkUnitFilter implements PathFilter {
        private WorkUnitFilter() {
        }

        @Override // org.apache.hadoop.fs.PathFilter
        public boolean accept(Path path) {
            return path.getName().endsWith(AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION) || path.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION);
        }
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
        LOG.info("Aborting Job: " + jobContext.getJobID() + " with state: " + state);
        Configuration configuration = jobContext.getConfiguration();
        FileSystem fileSystem = FileSystem.get(URI.create(configuration.get(ConfigurationKeys.FS_URI_KEY, "file:///")), configuration);
        Path path = new Path(configuration.get(ConfigurationKeys.MR_JOB_ROOT_DIR_KEY), configuration.get(ConfigurationKeys.JOB_NAME_KEY));
        Path path2 = new Path(path, "input");
        if (!fileSystem.exists(path2) || !fileSystem.isDirectory(path2)) {
            LOG.warn(String.format("%s either does not exist or is not a directory. No data to cleanup.", path2));
            return;
        }
        try {
            for (FileStatus fileStatus : fileSystem.listStatus(path2, new WorkUnitFilter())) {
                Closer create = Closer.create();
                if (fileStatus.getPath().getName().endsWith(AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION)) {
                    WorkUnit createEmpty = WorkUnit.createEmpty();
                    try {
                        createEmpty.readFields((DataInput) create.register(new DataInputStream(fileSystem.open(fileStatus.getPath()))));
                        create.close();
                        JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(createEmpty), LOG);
                    } finally {
                    }
                }
                if (fileStatus.getPath().getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
                    MultiWorkUnit createEmpty2 = MultiWorkUnit.createEmpty();
                    try {
                        createEmpty2.readFields((DataInput) create.register(new DataInputStream(fileSystem.open(fileStatus.getPath()))));
                        create.close();
                        Iterator<WorkUnit> it = createEmpty2.getWorkUnits().iterator();
                        while (it.hasNext()) {
                            JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(it.next()), LOG);
                        }
                    } finally {
                    }
                }
            }
            try {
                cleanUpWorkingDirectory(path, fileSystem);
                super.abortJob(jobContext, state);
            } finally {
            }
        } catch (Throwable th) {
            try {
                cleanUpWorkingDirectory(path, fileSystem);
                super.abortJob(jobContext, state);
                throw th;
            } finally {
            }
        }
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        String taskAttemptID = taskAttemptContext.getTaskAttemptID().toString();
        LOG.info("Committing task attempt: " + taskAttemptID);
        this.attemptIdToMultiTaskAttempt.get(taskAttemptID).commit();
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
        return this.attemptIdToMultiTaskAttempt.containsKey(taskAttemptContext.getTaskAttemptID().toString());
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void setupJob(JobContext jobContext) throws IOException {
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public boolean isRecoverySupported() {
        return true;
    }

    @Override // org.apache.hadoop.mapreduce.OutputCommitter
    public void recoverTask(TaskAttemptContext taskAttemptContext) throws IOException {
    }

    private static void cleanUpWorkingDirectory(Path path, FileSystem fileSystem) throws IOException {
        if (fileSystem.exists(path)) {
            fileSystem.delete(path, true);
            LOG.info("Deleted working directory " + path);
        }
    }

    public Map<String, GobblinMultiTaskAttempt> getAttemptIdToMultiTaskAttempt() {
        return this.attemptIdToMultiTaskAttempt;
    }
}
