package co.cask.cdap.internal.app.runtime.batch.dataset;

import co.cask.cdap.app.metrics.MapReduceMetrics;
import co.cask.cdap.common.lang.ClassLoaders;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/dataset/MultipleOutputs.class */
public class MultipleOutputs implements Closeable {
    private static final String MULTIPLE_OUTPUTS = "hconf.mapreduce.multipleoutputs";
    private static final String MO_PREFIX = "hconf.mapreduce.multipleoutputs.namedOutput.";
    private static final String FORMAT = ".format";
    private static final String KEY = ".key";
    private static final String VALUE = ".value";
    private static final String CONF = ".conf";
    private final TaskInputOutputContext context;
    private final Set<String> namedOutputs;
    private final Map<String, TaskAttemptContext> taskContexts = new HashMap();
    private final Map<String, RecordWriter<?, ?>> recordWriters = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/dataset/MultipleOutputs$MeteredRecordWriter.class */
    public static class MeteredRecordWriter<K, V> extends RecordWriter<K, V> {
        private final RecordWriter<K, V> writer;
        private final String groupName = TaskCounter.class.getName();
        private final String counterName;
        private final TaskInputOutputContext context;

        public MeteredRecordWriter(RecordWriter<K, V> recordWriter, TaskInputOutputContext taskInputOutputContext) {
            this.writer = recordWriter;
            this.context = taskInputOutputContext;
            this.counterName = getCounterName(taskInputOutputContext);
        }

        public void write(K k, V v) throws IOException, InterruptedException {
            this.context.getCounter(this.groupName, this.counterName).increment(1L);
            this.writer.write(k, v);
        }

        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            this.writer.close(taskAttemptContext);
        }

        private String getCounterName(TaskInputOutputContext taskInputOutputContext) {
            MapReduceMetrics.TaskType from = MapReduceMetrics.TaskType.from(taskInputOutputContext.getTaskAttemptID().getTaskType());
            switch (from) {
                case Mapper:
                    return TaskCounter.MAP_OUTPUT_RECORDS.name();
                case Reducer:
                    return TaskCounter.REDUCE_OUTPUT_RECORDS.name();
                default:
                    throw new IllegalArgumentException("Illegal task type: " + from);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/dataset/MultipleOutputs$WrappedStatusReporter.class */
    public static class WrappedStatusReporter extends StatusReporter {
        TaskAttemptContext context;

        public WrappedStatusReporter(TaskAttemptContext taskAttemptContext) {
            this.context = taskAttemptContext;
        }

        public Counter getCounter(Enum<?> r4) {
            return this.context.getCounter(r4);
        }

        public Counter getCounter(String str, String str2) {
            return this.context.getCounter(str, str2);
        }

        public void progress() {
            this.context.progress();
        }

        public float getProgress() {
            return this.context.getProgress();
        }

        public void setStatus(String str) {
            this.context.setStatus(str);
        }
    }

    private static void checkNamedOutputName(String str, Collection<String> collection, boolean z) {
        if (!z && collection.contains(str)) {
            throw new IllegalArgumentException("Named output '" + str + "' already defined");
        }
        if (z && !collection.contains(str)) {
            throw new IllegalArgumentException("Named output '" + str + "' not defined");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static List<String> getNamedOutputsList(JobContext jobContext) {
        return Lists.newArrayList(Splitter.on(" ").omitEmptyStrings().split(jobContext.getConfiguration().get(MULTIPLE_OUTPUTS, "")));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Class<? extends OutputFormat> getNamedOutputFormatClass(JobContext jobContext, String str) {
        return jobContext.getConfiguration().getClass(MO_PREFIX + str + FORMAT, (Class) null, OutputFormat.class);
    }

    private static Class<?> getNamedOutputKeyClass(JobContext jobContext, String str) {
        return jobContext.getConfiguration().getClass(MO_PREFIX + str + KEY, (Class) null, Object.class);
    }

    private static Class<?> getNamedOutputValueClass(JobContext jobContext, String str) {
        return jobContext.getConfiguration().getClass(MO_PREFIX + str + VALUE, (Class) null, Object.class);
    }

    static Map<String, String> getNamedConfigurations(JobContext jobContext, String str) {
        HashMap hashMap = new HashMap();
        String str2 = MO_PREFIX + str + CONF;
        for (Map.Entry entry : jobContext.getConfiguration().getValByRegex(str2 + ".*").entrySet()) {
            hashMap.put(((String) entry.getKey()).substring(str2.length()), entry.getValue());
        }
        return hashMap;
    }

    @VisibleForTesting
    static void setNamedConfigurations(Job job, String str, Map<String, String> map) {
        String str2 = MO_PREFIX + str + CONF;
        for (Map.Entry<String, String> entry : map.entrySet()) {
            job.getConfiguration().set(str2 + entry.getKey(), entry.getValue());
        }
    }

    public static void addNamedOutput(Job job, String str, String str2, Class<?> cls, Class<?> cls2, Map<String, String> map) {
        checkNamedOutputName(str, getNamedOutputsList(job), false);
        Configuration configuration = job.getConfiguration();
        configuration.set(MULTIPLE_OUTPUTS, configuration.get(MULTIPLE_OUTPUTS, "") + " " + str);
        configuration.set(MO_PREFIX + str + FORMAT, str2);
        configuration.setClass(MO_PREFIX + str + KEY, cls, Object.class);
        configuration.setClass(MO_PREFIX + str + VALUE, cls2, Object.class);
        setNamedConfigurations(job, str, map);
    }

    public MultipleOutputs(TaskInputOutputContext taskInputOutputContext) {
        this.context = taskInputOutputContext;
        this.namedOutputs = Collections.unmodifiableSet(new HashSet(getNamedOutputsList(taskInputOutputContext)));
    }

    public <K, V> void write(String str, K k, V v) throws IOException, InterruptedException {
        checkNamedOutputName(str, this.namedOutputs, true);
        getRecordWriter(str).write(k, v);
    }

    private synchronized RecordWriter getRecordWriter(String str) throws IOException, InterruptedException {
        RecordWriter<?, ?> recordWriter = this.recordWriters.get(str);
        if (recordWriter == null) {
            TaskAttemptContext context = getContext(str);
            try {
                Class outputFormatClass = context.getOutputFormatClass();
                ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(outputFormatClass.getClassLoader());
                try {
                    recordWriter = new MeteredRecordWriter(((OutputFormat) ReflectionUtils.newInstance(outputFormatClass, context.getConfiguration())).getRecordWriter(context), this.context);
                    ClassLoaders.setContextClassLoader(contextClassLoader);
                    this.recordWriters.put(str, recordWriter);
                } catch (Throwable th) {
                    ClassLoaders.setContextClassLoader(contextClassLoader);
                    throw th;
                }
            } catch (ClassNotFoundException e) {
                throw new IOException(e);
            }
        }
        return recordWriter;
    }

    private synchronized TaskAttemptContext getContext(String str) throws IOException {
        TaskAttemptContext taskAttemptContext = this.taskContexts.get(str);
        if (taskAttemptContext != null) {
            return taskAttemptContext;
        }
        TaskAttemptContext namedTaskContext = getNamedTaskContext(this.context, str);
        this.taskContexts.put(str, namedTaskContext);
        return namedTaskContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TaskAttemptContext getNamedTaskContext(TaskAttemptContext taskAttemptContext, String str) throws IOException {
        return new TaskAttemptContextImpl(getNamedJob(taskAttemptContext, str).getConfiguration(), taskAttemptContext.getTaskAttemptID(), new WrappedStatusReporter(taskAttemptContext));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static JobContext getNamedJobContext(JobContext jobContext, String str) throws IOException {
        Job namedJob = getNamedJob(jobContext, str);
        return new JobContextImpl(namedJob.getConfiguration(), namedJob.getJobID());
    }

    private static Job getNamedJob(JobContext jobContext, String str) throws IOException {
        Job job = Job.getInstance(jobContext.getConfiguration());
        job.setOutputFormatClass(getNamedOutputFormatClass(jobContext, str));
        job.setOutputKeyClass(getNamedOutputKeyClass(jobContext, str));
        job.setOutputValueClass(getNamedOutputValueClass(jobContext, str));
        Configuration configuration = job.getConfiguration();
        for (Map.Entry<String, String> entry : getNamedConfigurations(jobContext, str).entrySet()) {
            configuration.set(entry.getKey(), entry.getValue());
        }
        return job;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        closeRecordWriters(this.recordWriters.values(), this.context);
    }

    public static void closeRecordWriters(Iterable<RecordWriter<?, ?>> iterable, TaskAttemptContext taskAttemptContext) {
        RuntimeException runtimeException = null;
        Iterator<RecordWriter<?, ?>> it = iterable.iterator();
        while (it.hasNext()) {
            try {
                it.next().close(taskAttemptContext);
            } catch (IOException | InterruptedException e) {
                if (runtimeException == null) {
                    runtimeException = new RuntimeException(e);
                } else {
                    runtimeException.addSuppressed(e);
                }
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }
}
