package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.TaskLocalizationContext;
import co.cask.cdap.api.data.DatasetInstantiationException;
import co.cask.cdap.api.data.batch.BatchReadable;
import co.cask.cdap.api.data.batch.BatchWritable;
import co.cask.cdap.api.data.batch.Split;
import co.cask.cdap.api.data.batch.SplitReader;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.mapreduce.MapReduceSpecification;
import co.cask.cdap.api.mapreduce.MapReduceTaskContext;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.api.plugin.Plugin;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.app.metrics.MapReduceMetrics;
import co.cask.cdap.app.metrics.ProgramUserMetrics;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.internal.app.runtime.AbstractContext;
import co.cask.cdap.internal.app.runtime.DefaultTaskLocalizationContext;
import co.cask.cdap.internal.app.runtime.batch.dataset.CloseableBatchWritable;
import co.cask.cdap.internal.app.runtime.batch.dataset.ForwardingSplitReader;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputs;
import co.cask.cdap.internal.app.runtime.plugin.PluginInstantiator;
import co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramInfo;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/BasicMapReduceTaskContext.class */
public class BasicMapReduceTaskContext<KEYOUT, VALUEOUT> extends AbstractContext implements MapReduceTaskContext<KEYOUT, VALUEOUT> {
    private final MapReduceSpecification spec;
    private final WorkflowProgramInfo workflowProgramInfo;
    private final Metrics userMetrics;
    private final Map<String, Plugin> plugins;
    private final Transaction transaction;
    private final TaskLocalizationContext taskLocalizationContext;
    private MultipleOutputs multipleOutputs;
    private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
    private String inputName;
    private final Set<TransactionAware> txAwares;

    public BasicMapReduceTaskContext(Program program, @Nullable MapReduceMetrics.TaskType taskType, RunId runId, String str, Arguments arguments, MapReduceSpecification mapReduceSpecification, @Nullable WorkflowProgramInfo workflowProgramInfo, DiscoveryServiceClient discoveryServiceClient, MetricsCollectionService metricsCollectionService, TransactionSystemClient transactionSystemClient, Transaction transaction, DatasetFramework datasetFramework, @Nullable PluginInstantiator pluginInstantiator, Map<String, File> map) {
        super(program, runId, arguments, ImmutableSet.of(), createMetricsContext(program, runId.getId(), metricsCollectionService, str, taskType, workflowProgramInfo), datasetFramework, transactionSystemClient, discoveryServiceClient, false, pluginInstantiator);
        this.txAwares = Sets.newIdentityHashSet();
        this.workflowProgramInfo = workflowProgramInfo;
        this.transaction = transaction;
        this.userMetrics = new ProgramUserMetrics(getProgramMetrics());
        this.spec = mapReduceSpecification;
        this.plugins = Maps.newHashMap(program.getApplicationSpecification().getPlugins());
        this.taskLocalizationContext = new DefaultTaskLocalizationContext(map);
        initializeTransactionAwares();
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public String toString() {
        return String.format("job=%s,=%s", this.spec.getName(), super.toString());
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public Map<String, Plugin> getPlugins() {
        return this.plugins;
    }

    public <K, V> void write(String str, K k, V v) throws IOException, InterruptedException {
        if (this.multipleOutputs == null) {
            throw new IOException("MultipleOutputs has not been initialized.");
        }
        this.multipleOutputs.write(str, k, v);
    }

    public void write(KEYOUT keyout, VALUEOUT valueout) throws IOException, InterruptedException {
        if (this.multipleOutputs == null) {
            throw new IOException("Hadoop context has not been initialized.");
        }
        this.context.write(keyout, valueout);
    }

    public <T> T getHadoopContext() {
        return this.context;
    }

    public void setHadoopContext(TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> taskInputOutputContext) {
        this.multipleOutputs = new MultipleOutputs(taskInputOutputContext);
        this.context = taskInputOutputContext;
    }

    public void setInputName(String str) {
        this.inputName = str;
    }

    public void closeMultiOutputs() {
        if (this.multipleOutputs != null) {
            this.multipleOutputs.close();
        }
    }

    public MapReduceSpecification getSpecification() {
        return this.spec;
    }

    @Nullable
    public WorkflowToken getWorkflowToken() {
        if (this.workflowProgramInfo == null) {
            return null;
        }
        return this.workflowProgramInfo.getWorkflowToken();
    }

    @Nullable
    /* renamed from: getWorkflowInfo, reason: merged with bridge method [inline-methods] */
    public WorkflowProgramInfo m82getWorkflowInfo() {
        return this.workflowProgramInfo;
    }

    @Nullable
    public String getInputName() {
        return this.inputName;
    }

    private static MetricsContext createMetricsContext(Program program, String str, MetricsCollectionService metricsCollectionService, String str2, @Nullable MapReduceMetrics.TaskType taskType, @Nullable WorkflowProgramInfo workflowProgramInfo) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.putAll(getMetricsContext(program, str));
        if (taskType != null) {
            newHashMap.put("mrt", taskType.getId());
            newHashMap.put("ins", str2);
        }
        if (workflowProgramInfo != null) {
            newHashMap.put("wf", workflowProgramInfo.getName());
            newHashMap.put("wfr", workflowProgramInfo.getRunId().getId());
            newHashMap.put("nd", workflowProgramInfo.getNodeId());
        }
        return metricsCollectionService.getContext(newHashMap);
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public Metrics getMetrics() {
        return this.userMetrics;
    }

    private void initializeTransactionAwares() {
        for (TransactionAware transactionAware : getDatasetCache().getStaticTransactionAwares()) {
            this.txAwares.add(transactionAware);
            transactionAware.startTx(this.transaction);
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public <T extends Dataset> T getDataset(String str, Map<String, String> map) throws DatasetInstantiationException {
        return (T) getDataset(str, map, AccessType.UNKNOWN);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public <T extends Dataset> T getDataset(String str, Map<String, String> map, AccessType accessType) throws DatasetInstantiationException {
        TransactionAware dataset = super.getDataset(str, map, accessType);
        if (dataset instanceof TransactionAware) {
            TransactionAware transactionAware = dataset;
            if (this.txAwares.add(transactionAware)) {
                transactionAware.startTx(this.transaction);
            }
        }
        return dataset;
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public void releaseDataset(Dataset dataset) {
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public void discardDataset(Dataset dataset) {
    }

    public void flushOperations() throws Exception {
        Iterator<TransactionAware> it = this.txAwares.iterator();
        while (it.hasNext()) {
            it.next().commitTx();
        }
    }

    public File getLocalFile(String str) throws FileNotFoundException {
        return this.taskLocalizationContext.getLocalFile(str);
    }

    public Map<String, File> getAllLocalFiles() {
        return this.taskLocalizationContext.getAllLocalFiles();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> BatchReadable<K, V> getBatchReadable(String str, Map<String, String> map) {
        BatchReadable dataset = getDataset(str, map, AccessType.READ);
        Preconditions.checkArgument(dataset instanceof BatchReadable, "Dataset '%s' is not a BatchReadable.", new Object[]{str});
        final BatchReadable batchReadable = dataset;
        return new BatchReadable<K, V>() { // from class: co.cask.cdap.internal.app.runtime.batch.BasicMapReduceTaskContext.1
            public List<Split> getSplits() {
                try {
                    try {
                        List<Split> splits = batchReadable.getSplits();
                        BasicMapReduceTaskContext.this.flushOperations();
                        return splits;
                    } catch (Throwable th) {
                        BasicMapReduceTaskContext.this.flushOperations();
                        throw th;
                    }
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }

            public SplitReader<K, V> createSplitReader(Split split) {
                return new ForwardingSplitReader<K, V>(batchReadable.createSplitReader(split)) { // from class: co.cask.cdap.internal.app.runtime.batch.BasicMapReduceTaskContext.1.1
                    @Override // co.cask.cdap.internal.app.runtime.batch.dataset.ForwardingSplitReader
                    public void close() {
                        try {
                            try {
                                super.close();
                                BasicMapReduceTaskContext.this.flushOperations();
                            } catch (Throwable th) {
                                BasicMapReduceTaskContext.this.flushOperations();
                                throw th;
                            }
                        } catch (Exception e) {
                            throw Throwables.propagate(e);
                        }
                    }
                };
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> CloseableBatchWritable<K, V> getBatchWritable(String str, Map<String, String> map) {
        BatchWritable dataset = getDataset(str, map, AccessType.WRITE);
        Preconditions.checkArgument(dataset instanceof BatchWritable, "Dataset '%s' is not a BatchWritable.", new Object[]{str});
        final BatchWritable batchWritable = dataset;
        return new CloseableBatchWritable<K, V>() { // from class: co.cask.cdap.internal.app.runtime.batch.BasicMapReduceTaskContext.2
            public void write(K k, V v) {
                batchWritable.write(k, v);
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                try {
                    BasicMapReduceTaskContext.this.flushOperations();
                } catch (Exception e) {
                    Throwables.propagateIfInstanceOf(e, IOException.class);
                    throw new IOException(e);
                }
            }
        };
    }
}
