package org.apache.gobblin.compaction.mapreduce;

import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.math3.primes.Primes;
import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.class */
public abstract class CompactionJobConfigurator {
    private static final Logger log = LoggerFactory.getLogger(CompactionJobConfigurator.class);
    public static final String COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY = "compaction.jobConfiguratorFactory.class";
    public static final String DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS = "org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator$Factory";
    protected final State state;
    protected final FileSystem fs;
    protected Job configuredJob;
    protected final boolean shouldDeduplicate;
    protected Path mrOutputPath = null;
    protected boolean isJobCreated = false;
    protected Collection<Path> mapReduceInputPaths = null;
    protected Collection<String> oldFiles = null;
    protected Collection<Path> dstNewFiles = null;
    protected long fileNameRecordCount = 0;

    /* loaded from: input_file:org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator$ConfiguratorFactory.class */
    public interface ConfiguratorFactory {
        CompactionJobConfigurator createConfigurator(State state) throws IOException;
    }

    /* loaded from: input_file:org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator$EXTENSION.class */
    protected enum EXTENSION {
        AVRO(CompactorOutputCommitter.DEFAULT_COMPACTION_OUTPUT_EXTENSION),
        ORC("orc");

        private String extensionString;

        public String getExtensionString() {
            return this.extensionString;
        }

        EXTENSION(String str) {
            this.extensionString = str;
        }
    }

    public CompactionJobConfigurator(State state) throws IOException {
        this.state = state;
        this.fs = getFileSystem(state);
        this.shouldDeduplicate = state.getPropAsBoolean(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, true);
    }

    public static CompactionJobConfigurator instantiateConfigurator(State state) {
        try {
            return ((ConfiguratorFactory) Class.forName(state.getProp(COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY, DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS)).asSubclass(ConfiguratorFactory.class).newInstance()).createConfigurator(state);
        } catch (IOException | ReflectiveOperationException e) {
            throw new RuntimeException("Failed to instantiate a instance of job configurator:", e);
        }
    }

    public abstract String getFileExtension();

    public Job createJob(FileSystemDataset fileSystemDataset) throws IOException {
        Configuration confFromState = HadoopUtils.getConfFromState(this.state);
        if (confFromState.get("mapreduce.output.fileoutputformat.compress") == null && confFromState.get("mapred.output.compress") == null) {
            confFromState.setBoolean("mapreduce.output.fileoutputformat.compress", true);
        }
        if (confFromState.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
            confFromState.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
        }
        addJars(confFromState, this.state, this.fs);
        Job job = Job.getInstance(confFromState);
        job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
        boolean configureInputAndOutputPaths = configureInputAndOutputPaths(job, fileSystemDataset);
        if (configureInputAndOutputPaths) {
            this.state.setProp("mapreduce.job.input.path.empty", true);
        }
        configureMapper(job);
        configureReducer(job);
        if (configureInputAndOutputPaths || !this.shouldDeduplicate) {
            job.setNumReduceTasks(0);
        }
        configureSchema(job);
        this.isJobCreated = true;
        this.configuredJob = job;
        return job;
    }

    protected abstract void configureSchema(Job job) throws IOException;

    protected abstract void configureMapper(Job job);

    protected abstract void configureReducer(Job job) throws IOException;

    protected FileSystem getFileSystem(State state) throws IOException {
        return FileSystem.get(URI.create(state.getProp("source.filebased.fs.uri", "file:///")), HadoopUtils.getConfFromState(state));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setNumberOfReducers(Job job) throws IOException {
        long j = 0;
        Iterator<Path> it = this.mapReduceInputPaths.iterator();
        while (it.hasNext()) {
            j += this.fs.getContentSummary(it.next()).getLength();
        }
        int min = Math.min(Ints.checkedCast(j / this.state.getPropAsLong(MRCompactorJobRunner.COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE, MRCompactorJobRunner.DEFAULT_COMPACTION_JOB_TARGET_OUTPUT_FILE_SIZE)) + 1, this.state.getPropAsInt(MRCompactorJobRunner.COMPACTION_JOB_MAX_NUM_REDUCERS, MRCompactorJobRunner.DEFAULT_COMPACTION_JOB_MAX_NUM_REDUCERS));
        if (this.state.getPropAsBoolean(MRCompactorJobRunner.COMPACTION_JOB_USE_PRIME_REDUCERS, true) && min != 1) {
            min = Primes.nextPrime(min);
        }
        job.setNumReduceTasks(min);
    }

    protected void addJars(Configuration configuration, State state, FileSystem fileSystem) throws IOException {
        if (state.contains(MRCompactor.COMPACTION_JARS)) {
            for (FileStatus fileStatus : fileSystem.listStatus(new Path(state.getProp(MRCompactor.COMPACTION_JARS)))) {
                DistributedCache.addFileToClassPath(fileStatus.getPath(), configuration, fileSystem);
            }
        }
    }

    protected boolean configureInputAndOutputPaths(Job job, FileSystemDataset fileSystemDataset) throws IOException {
        boolean z = false;
        String prop = this.state.getProp(MRCompactor.COMPACTION_JOB_DIR);
        CompactionPathParser.CompactionParserResult parse = new CompactionPathParser(this.state).parse(fileSystemDataset);
        this.mrOutputPath = concatPaths(prop, parse.getDatasetName(), parse.getDstSubDir(), parse.getTimeString());
        log.info("Cleaning temporary MR output directory: " + this.mrOutputPath);
        this.fs.delete(this.mrOutputPath, true);
        this.mapReduceInputPaths = getGranularInputPaths(fileSystemDataset.datasetRoot());
        if (this.mapReduceInputPaths.isEmpty()) {
            this.mapReduceInputPaths.add(fileSystemDataset.datasetRoot());
            z = true;
        }
        this.oldFiles = new HashSet();
        for (Path path : this.mapReduceInputPaths) {
            this.oldFiles.add(this.fs.makeQualified(path).toString());
            FileInputFormat.addInputPath(job, path);
        }
        FileOutputFormat.setOutputPath(job, this.mrOutputPath);
        return z;
    }

    private Path concatPaths(String... strArr) {
        if (strArr == null || strArr.length == 0) {
            return null;
        }
        Path path = new Path(strArr[0]);
        for (int i = 1; i < strArr.length; i++) {
            path = new Path(path, new Path(strArr[i]));
        }
        return path;
    }

    protected Collection<Path> getGranularInputPaths(Path path) throws IOException {
        boolean propAsBoolean = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED, false);
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        for (FileStatus fileStatus : FileListUtils.listFilesRecursively(this.fs, path)) {
            if (propAsBoolean) {
                if (!fileStatus.getPath().getParent().toString().endsWith(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_SUFFIX)) {
                    newHashSet.add(fileStatus.getPath().getParent());
                }
                newHashSet2.add(fileStatus.getPath().getParent());
            } else {
                newHashSet.add(fileStatus.getPath().getParent());
            }
        }
        if (propAsBoolean) {
            this.fileNameRecordCount = new InputRecordCountHelper(this.state).calculateRecordCount(newHashSet2);
            log.info("{} has total input record count (based on file name) {}", path, Long.valueOf(this.fileNameRecordCount));
        }
        return newHashSet;
    }

    private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job job) {
        LinkedList linkedList = new LinkedList();
        while (true) {
            try {
                org.apache.hadoop.mapred.TaskCompletionEvent[] taskCompletionEvents = job.getTaskCompletionEvents(linkedList.size());
                if (taskCompletionEvents == null || taskCompletionEvents.length == 0) {
                    break;
                }
                linkedList.addAll(Arrays.asList(taskCompletionEvents));
            } catch (IOException e) {
            }
        }
        return linkedList;
    }

    private static List<TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job job) {
        return (List) getAllTaskCompletionEvent(job).stream().filter(taskCompletionEvent -> {
            return taskCompletionEvent.getStatus() != TaskCompletionEvent.Status.SUCCEEDED;
        }).collect(Collectors.toList());
    }

    private static boolean isFailedPath(Path path, List<TaskCompletionEvent> list) {
        return path.toString().contains("_temporary") || list.stream().anyMatch(taskCompletionEvent -> {
            return path.toString().contains(TestCompactionTaskUtils.PATH_SEPARATOR + taskCompletionEvent.getTaskAttemptId().toString() + TestCompactionTaskUtils.PATH_SEPARATOR);
        });
    }

    public static List<Path> getGoodFiles(Job job, Path path, FileSystem fileSystem, List<String> list) throws IOException {
        List<TaskCompletionEvent> unsuccessfulTaskCompletionEvent = getUnsuccessfulTaskCompletionEvent(job);
        List<Path> applicableFilePaths = DatasetHelper.getApplicableFilePaths(fileSystem, path, list);
        ArrayList arrayList = new ArrayList();
        for (Path path2 : applicableFilePaths) {
            if (isFailedPath(path2, unsuccessfulTaskCompletionEvent)) {
                fileSystem.delete(path2, false);
                log.error("{} is a bad path so it was deleted", path2);
            } else {
                arrayList.add(path2);
            }
        }
        return arrayList;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public Job getConfiguredJob() {
        return this.configuredJob;
    }

    public boolean isShouldDeduplicate() {
        return this.shouldDeduplicate;
    }

    public Path getMrOutputPath() {
        return this.mrOutputPath;
    }

    public boolean isJobCreated() {
        return this.isJobCreated;
    }

    public Collection<Path> getMapReduceInputPaths() {
        return this.mapReduceInputPaths;
    }

    public Collection<String> getOldFiles() {
        return this.oldFiles;
    }

    public Collection<Path> getDstNewFiles() {
        return this.dstNewFiles;
    }

    public void setDstNewFiles(Collection<Path> collection) {
        this.dstNewFiles = collection;
    }

    public long getFileNameRecordCount() {
        return this.fileNameRecordCount;
    }
}
