package org.apache.crunch.io;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.util.ReflectionUtils;

/* loaded from: input_file:lib/crunch-core-0.12.0-hadoop2.jar:org/apache/crunch/io/CrunchOutputs.class */
public class CrunchOutputs<K, V> {
    public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir";
    public static final String CRUNCH_DISABLE_OUTPUT_COUNTERS = "crunch.disable.output.counters";
    private static final char RECORD_SEP = ',';
    private static final char FIELD_SEP = ';';
    private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
    private TaskInputOutputContext<?, ?, K, V> baseContext;
    private Configuration baseConf;
    private final Map<String, OutputConfig> namedOutputs;
    private final Map<String, OutputState<K, V>> outputStates;
    private final boolean disableOutputCounters;
    private static final Joiner JOINER = Joiner.on(';');
    private static final Splitter SPLITTER = Splitter.on(';');
    private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();

    /* loaded from: input_file:lib/crunch-core-0.12.0-hadoop2.jar:org/apache/crunch/io/CrunchOutputs$CompositeOutputCommitter.class */
    private static class CompositeOutputCommitter extends OutputCommitter {
        private final Map<String, OutputConfig> outputs;
        private final Map<String, OutputCommitter> committers;

        public CompositeOutputCommitter(Map<String, OutputConfig> map, Map<String, OutputCommitter> map2) {
            this.outputs = map;
            this.committers = map2;
        }

        private TaskAttemptContext getContext(String str, TaskAttemptContext taskAttemptContext) throws IOException {
            Job job = CrunchOutputs.getJob(taskAttemptContext.getJobID(), str, taskAttemptContext.getConfiguration());
            CrunchOutputs.configureJob(str, job, this.outputs.get(str));
            return CrunchOutputs.getTaskContext(taskAttemptContext, job);
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void setupJob(JobContext jobContext) throws IOException {
            Configuration configuration = jobContext.getConfiguration();
            for (Map.Entry<String, OutputCommitter> entry : this.committers.entrySet()) {
                Job job = CrunchOutputs.getJob(jobContext.getJobID(), entry.getKey(), configuration);
                CrunchOutputs.configureJob(entry.getKey(), job, this.outputs.get(entry.getKey()));
                entry.getValue().setupJob(job);
            }
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
            for (Map.Entry<String, OutputCommitter> entry : this.committers.entrySet()) {
                entry.getValue().setupTask(getContext(entry.getKey(), taskAttemptContext));
            }
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
            for (Map.Entry<String, OutputCommitter> entry : this.committers.entrySet()) {
                if (entry.getValue().needsTaskCommit(getContext(entry.getKey(), taskAttemptContext))) {
                    return true;
                }
            }
            return false;
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
            for (Map.Entry<String, OutputCommitter> entry : this.committers.entrySet()) {
                entry.getValue().commitTask(getContext(entry.getKey(), taskAttemptContext));
            }
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
            for (Map.Entry<String, OutputCommitter> entry : this.committers.entrySet()) {
                entry.getValue().abortTask(getContext(entry.getKey(), taskAttemptContext));
            }
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void commitJob(JobContext jobContext) throws IOException {
            Configuration configuration = jobContext.getConfiguration();
            HashSet newHashSet = Sets.newHashSet();
            for (Map.Entry<String, OutputCommitter> entry : this.committers.entrySet()) {
                OutputCommitter value = entry.getValue();
                Job job = CrunchOutputs.getJob(jobContext.getJobID(), entry.getKey(), configuration);
                CrunchOutputs.configureJob(entry.getKey(), job, this.outputs.get(entry.getKey()));
                if (value instanceof FileOutputCommitter) {
                    Path parent = ((FileOutputCommitter) value).getWorkPath().getParent();
                    if (!newHashSet.contains(parent)) {
                        newHashSet.add(parent);
                    }
                }
                value.commitJob(job);
            }
        }

        @Override // org.apache.hadoop.mapreduce.OutputCommitter
        public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
            Configuration configuration = jobContext.getConfiguration();
            for (Map.Entry<String, OutputCommitter> entry : this.committers.entrySet()) {
                Job job = CrunchOutputs.getJob(jobContext.getJobID(), entry.getKey(), configuration);
                CrunchOutputs.configureJob(entry.getKey(), job, this.outputs.get(entry.getKey()));
                entry.getValue().abortJob(job, state);
            }
        }
    }

    /* loaded from: input_file:lib/crunch-core-0.12.0-hadoop2.jar:org/apache/crunch/io/CrunchOutputs$OutputConfig.class */
    public static class OutputConfig<K, V> {
        public FormatBundle<OutputFormat<K, V>> bundle;
        public Class<K> keyClass;
        public Class<V> valueClass;

        public OutputConfig(FormatBundle<OutputFormat<K, V>> formatBundle, Class<K> cls, Class<V> cls2) {
            this.bundle = formatBundle;
            this.keyClass = cls;
            this.valueClass = cls2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/crunch-core-0.12.0-hadoop2.jar:org/apache/crunch/io/CrunchOutputs$OutputState.class */
    public static class OutputState<K, V> {
        private final TaskAttemptContext context;
        private final RecordWriter<K, V> recordWriter;

        public OutputState(TaskAttemptContext taskAttemptContext, RecordWriter<K, V> recordWriter) {
            this.context = taskAttemptContext;
            this.recordWriter = recordWriter;
        }

        public void write(K k, V v) throws IOException, InterruptedException {
            this.recordWriter.write(k, v);
        }

        public void close() throws IOException, InterruptedException {
            this.recordWriter.close(this.context);
        }
    }

    public static void addNamedOutput(Job job, String str, Class<? extends OutputFormat> cls, Class cls2, Class cls3) {
        addNamedOutput(job, str, (FormatBundle<? extends OutputFormat>) FormatBundle.forOutput(cls), cls2, cls3);
    }

    public static void addNamedOutput(Job job, String str, FormatBundle<? extends OutputFormat> formatBundle, Class cls, Class cls2) {
        Configuration configuration = job.getConfiguration();
        String join = JOINER.join(str, formatBundle.serialize(), cls.getName(), cls2.getName());
        String str2 = configuration.get(CRUNCH_OUTPUTS);
        configuration.set(CRUNCH_OUTPUTS, str2 == null ? join : str2 + ',' + join);
    }

    public static void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
        for (Map.Entry<String, OutputConfig> entry : getNamedOutputs(jobContext.getConfiguration()).entrySet()) {
            String key = entry.getKey();
            Job job = getJob(jobContext.getJobID(), entry.getKey(), jobContext.getConfiguration());
            getOutputFormat(key, job, entry.getValue()).checkOutputSpecs(job);
        }
    }

    public static OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        Map<String, OutputConfig> namedOutputs = getNamedOutputs(taskAttemptContext.getConfiguration());
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, OutputConfig> entry : namedOutputs.entrySet()) {
            String key = entry.getKey();
            Job job = getJob(taskAttemptContext.getJobID(), entry.getKey(), taskAttemptContext.getConfiguration());
            newHashMap.put(key, getOutputFormat(key, job, entry.getValue()).getOutputCommitter(getTaskContext(taskAttemptContext, job)));
        }
        return new CompositeOutputCommitter(namedOutputs, newHashMap);
    }

    private static Map<String, OutputConfig> getNamedOutputs(TaskInputOutputContext<?, ?, ?, ?> taskInputOutputContext) {
        return getNamedOutputs(taskInputOutputContext.getConfiguration());
    }

    public static Map<String, OutputConfig> getNamedOutputs(Configuration configuration) {
        HashMap newHashMap = Maps.newHashMap();
        String str = configuration.get(CRUNCH_OUTPUTS);
        if (str == null || str.isEmpty()) {
            return newHashMap;
        }
        Iterator<String> it = Splitter.on(',').split(configuration.get(CRUNCH_OUTPUTS)).iterator();
        while (it.hasNext()) {
            ArrayList newArrayList = Lists.newArrayList(SPLITTER.split(it.next()));
            try {
                newHashMap.put((String) newArrayList.get(0), new OutputConfig(FormatBundle.fromSerialized((String) newArrayList.get(1), configuration), Class.forName((String) newArrayList.get(2)), Class.forName((String) newArrayList.get(3))));
            } catch (ClassNotFoundException e) {
                throw new CrunchRuntimeException(e);
            }
        }
        return newHashMap;
    }

    public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> taskInputOutputContext) {
        this(taskInputOutputContext.getConfiguration());
        this.baseContext = taskInputOutputContext;
    }

    public CrunchOutputs(Configuration configuration) {
        this.baseConf = configuration;
        this.namedOutputs = getNamedOutputs(configuration);
        this.outputStates = Maps.newHashMap();
        this.disableOutputCounters = configuration.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
    }

    public void write(String str, K k, V v) throws IOException, InterruptedException {
        if (!this.namedOutputs.containsKey(str)) {
            throw new IllegalArgumentException("Undefined named output '" + str + "'");
        }
        if (!this.disableOutputCounters) {
            this.baseContext.getCounter(COUNTERS_GROUP, str).increment(1L);
        }
        getOutputState(str).write(k, v);
    }

    public void close() throws IOException, InterruptedException {
        Iterator<OutputState<K, V>> it = this.outputStates.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private OutputState<K, V> getOutputState(String str) throws IOException, InterruptedException {
        OutputState<K, V> outputState = this.outputStates.get(str);
        if (outputState != null) {
            return outputState;
        }
        Job job = getJob(getJob(this.baseContext.getJobID(), str, this.baseConf).getJobID(), str, this.baseConf);
        OutputFormat outputFormat = getOutputFormat(str, job, this.namedOutputs.get(str));
        TaskAttemptContext taskAttemptContext = null;
        RecordWriter<K, V> recordWriter = null;
        if (this.baseContext != null) {
            taskAttemptContext = getTaskContext(this.baseContext, job);
            recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
        }
        OutputState<K, V> outputState2 = new OutputState<>(taskAttemptContext, recordWriter);
        this.outputStates.put(str, outputState2);
        return outputState2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Job getJob(JobID jobID, String str, Configuration configuration) throws IOException {
        Job job = new Job(new Configuration(configuration));
        job.getConfiguration().set("crunch.namedoutput", str);
        setJobID(job, jobID, str);
        return job;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static TaskAttemptContext getTaskContext(TaskAttemptContext taskAttemptContext, Job job) {
        TaskAttemptID taskAttemptID = taskAttemptContext.getTaskAttemptID();
        return TaskAttemptContextFactory.create(job.getConfiguration(), new TaskAttemptID(job.getJobID().getJtIdentifier(), job.getJobID().getId(), taskAttemptID.isMap(), taskAttemptID.getTaskID().getId(), taskAttemptID.getId()));
    }

    private static void setJobID(Job job, JobID jobID, String str) {
        Method declaredMethod;
        JobID jobID2 = jobID;
        try {
            declaredMethod = Job.class.getMethod("setJobID", JobID.class);
            jobID2 = (jobID == null || jobID.getJtIdentifier().contains(str)) ? jobID : new JobID(jobID.getJtIdentifier() + "_" + str, jobID.getId());
        } catch (NoSuchMethodException e) {
            try {
                declaredMethod = JobContext.class.getDeclaredMethod("setJobID", JobID.class);
                declaredMethod.setAccessible(true);
            } catch (NoSuchMethodException e2) {
                throw new CrunchRuntimeException(e);
            }
        }
        try {
            declaredMethod.invoke(job, jobID2);
        } catch (Exception e3) {
            throw new CrunchRuntimeException("Could not set job ID to " + jobID, e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void configureJob(String str, Job job, OutputConfig outputConfig) throws IOException {
        job.getConfiguration().set(BASE_OUTPUT_NAME, str);
        job.setOutputFormatClass(outputConfig.bundle.getFormatClass());
        job.setOutputKeyClass(outputConfig.keyClass);
        job.setOutputValueClass(outputConfig.valueClass);
        outputConfig.bundle.configure(job.getConfiguration());
    }

    private static OutputFormat getOutputFormat(String str, Job job, OutputConfig outputConfig) throws IOException {
        configureJob(str, job, outputConfig);
        try {
            return (OutputFormat) ReflectionUtils.newInstance(job.getOutputFormatClass(), job.getConfiguration());
        } catch (ClassNotFoundException e) {
            throw new IOException(e);
        }
    }
}
