package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.data.batch.DatasetOutputCommitter;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.data2.transaction.RetryingLongTransactionSystemClient;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.SystemArguments;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputsCommitter;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.Outputs;
import co.cask.cdap.internal.app.runtime.batch.dataset.output.ProvidedOutput;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramId;
import com.google.common.base.Throwables;
import com.google.inject.Injector;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/MainOutputCommitter.class */
public class MainOutputCommitter extends MultipleOutputsCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(MainOutputCommitter.class);
    private final TaskAttemptContext taskAttemptContext;
    private CConfiguration cConf;
    private TransactionSystemClient txClient;
    private Transaction transaction;
    private BasicMapReduceTaskContext taskContext;
    private List<ProvidedOutput> outputs;
    private boolean completedCallingOnFailure;

    public MainOutputCommitter(OutputCommitter outputCommitter, Map<String, OutputCommitter> map, TaskAttemptContext taskAttemptContext) {
        super(outputCommitter, map);
        this.taskAttemptContext = taskAttemptContext;
    }

    @Override // co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputsCommitter
    public void setupJob(JobContext jobContext) throws IOException {
        Configuration configuration = jobContext.getConfiguration();
        MapReduceTaskContextProvider taskContextProvider = MapReduceClassLoader.getFromConfiguration(configuration).getTaskContextProvider();
        Injector injector = taskContextProvider.getInjector();
        this.cConf = (CConfiguration) injector.getInstance(CConfiguration.class);
        MapReduceContextConfig mapReduceContextConfig = new MapReduceContextConfig(jobContext.getConfiguration());
        ProgramId programId = mapReduceContextConfig.getProgramId();
        LOG.info("Setting up for MapReduce job: namespaceId={}, applicationId={}, program={}, runid={}", new Object[]{programId.getNamespace(), programId.getApplication(), programId.getProgram(), ProgramRunners.getRunId(mapReduceContextConfig.getProgramOptions())});
        this.txClient = new RetryingLongTransactionSystemClient((TransactionSystemClient) injector.getInstance(TransactionSystemClient.class), SystemArguments.getRetryStrategy(mapReduceContextConfig.getProgramOptions().getUserArguments().asMap(), mapReduceContextConfig.getProgramId().getType(), this.cConf));
        this.transaction = this.txClient.startLong();
        Path txFile = getTxFile(configuration, jobContext.getJobID());
        FSDataOutputStream create = txFile.getFileSystem(configuration).create(txFile, false);
        Throwable th = null;
        try {
            try {
                create.write(new TransactionCodec().encode(this.transaction));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                this.taskContext = taskContextProvider.get(this.taskAttemptContext);
                this.outputs = Outputs.transform(mapReduceContextConfig.getOutputs(), this.taskContext);
                super.setupJob(jobContext);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Path getTxFile(Configuration configuration, @Nullable JobID jobID) throws IOException {
        Path stagingAreaDir = MRApps.getStagingAreaDir(configuration, UserGroupInformation.getCurrentUser().getShortUserName());
        int i = configuration.getInt("mapreduce.job.application.attempt.id", 0);
        String jobID2 = jobID != null ? jobID.toString() : configuration.get("mapreduce.job.id");
        return new Path(stagingAreaDir, jobID2 + "/" + jobID2 + "_" + i + "_tx-file");
    }

    @Override // co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputsCommitter
    public void commitJob(JobContext jobContext) throws IOException {
        super.commitJob(jobContext);
        onFinish(jobContext, true);
        commitTx();
    }

    private void commitTx() throws IOException {
        try {
            LOG.debug("Committing MapReduce Job transaction: {}", Long.valueOf(this.transaction.getWritePointer()));
            this.taskContext.getMessagingService().publish(StoreRequestBuilder.of(NamespaceId.SYSTEM.topic(this.cConf.get("data.event.topic"))).setTransaction(Long.valueOf(this.transaction.getWritePointer())).build());
            this.taskContext.flushOperations();
            try {
                this.txClient.commitOrThrow(this.transaction);
                this.taskContext.postTxCommit();
            } catch (TransactionFailureException e) {
                LOG.warn("MapReduce Job transaction {} failed to commit", Long.valueOf(this.transaction.getTransactionId()));
                throw e;
            }
        } catch (Exception e2) {
            Throwables.propagateIfInstanceOf(e2, IOException.class);
            throw Throwables.propagate(e2);
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // co.cask.cdap.internal.app.runtime.batch.dataset.output.MultipleOutputsCommitter
    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
        try {
            super.abortJob(jobContext, state);
            try {
                onFinish(jobContext, false);
                if (this.transaction == null) {
                    LOG.warn("Did not invalidate transaction; job setup did not complete or invalidate already happened.");
                    return;
                }
                LOG.info("Invalidating transaction {}", Long.valueOf(this.transaction.getWritePointer()));
                this.txClient.invalidate(this.transaction.getWritePointer());
                this.transaction = null;
            } catch (Throwable th) {
                throw th;
            }
        } catch (Throwable th2) {
            try {
                onFinish(jobContext, false);
                if (this.transaction != null) {
                    LOG.info("Invalidating transaction {}", Long.valueOf(this.transaction.getWritePointer()));
                    this.txClient.invalidate(this.transaction.getWritePointer());
                    this.transaction = null;
                } else {
                    LOG.warn("Did not invalidate transaction; job setup did not complete or invalidate already happened.");
                }
                throw th2;
            } finally {
                if (this.transaction != null) {
                    LOG.info("Invalidating transaction {}", Long.valueOf(this.transaction.getWritePointer()));
                    this.txClient.invalidate(this.transaction.getWritePointer());
                    this.transaction = null;
                } else {
                    LOG.warn("Did not invalidate transaction; job setup did not complete or invalidate already happened.");
                }
            }
        }
    }

    private void onFinish(JobContext jobContext, boolean z) throws IOException {
        try {
            if (this.taskContext == null || this.outputs == null) {
                LOG.warn("Did not commit datasets, since job setup did not complete.");
            } else {
                finishDatasets(jobContext, z);
            }
        } catch (Exception e) {
            Throwables.propagateIfInstanceOf(e, IOException.class);
            Throwables.propagate(e);
        }
    }

    private Map<String, DatasetOutputCommitter> getDatasetOutputCommitters(List<ProvidedOutput> list) {
        HashMap hashMap = new HashMap();
        for (ProvidedOutput providedOutput : list) {
            if (providedOutput.getOutputFormatProvider() instanceof DatasetOutputCommitter) {
                hashMap.put(providedOutput.getOutput().getAlias(), providedOutput.getOutputFormatProvider());
            }
        }
        return hashMap;
    }

    private void finishDatasets(JobContext jobContext, boolean z) throws Exception {
        ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(jobContext.getConfiguration().getClassLoader());
        Map<String, DatasetOutputCommitter> datasetOutputCommitters = getDatasetOutputCommitters(this.outputs);
        try {
            if (z) {
                commitOutputs(datasetOutputCommitters);
            } else {
                failOutputs(datasetOutputCommitters);
            }
            ClassLoaders.setContextClassLoader(contextClassLoader);
        } catch (Throwable th) {
            ClassLoaders.setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private void commitOutputs(Map<String, DatasetOutputCommitter> map) {
        for (Map.Entry<String, DatasetOutputCommitter> entry : map.entrySet()) {
            try {
                entry.getValue().onSuccess();
            } catch (Exception e) {
                LOG.error(String.format("Error from onSuccess method of output dataset %s.", entry.getKey()), e);
                throw e;
            }
        }
    }

    private void failOutputs(Map<String, DatasetOutputCommitter> map) throws Exception {
        if (this.completedCallingOnFailure) {
            LOG.info("Not calling onFailure on outputs, as it has they have already been executed.");
            return;
        }
        Exception exc = null;
        for (Map.Entry<String, DatasetOutputCommitter> entry : map.entrySet()) {
            try {
                entry.getValue().onFailure();
            } catch (Exception e) {
                LOG.error(String.format("Error from onFailure method of output dataset %s.", entry.getKey()), e);
                if (exc != null) {
                    exc.addSuppressed(e);
                } else {
                    exc = e;
                }
            }
        }
        this.completedCallingOnFailure = true;
        if (exc != null) {
            throw exc;
        }
    }
}
