package co.cask.cdap.app.runtime.spark;

import co.cask.cdap.data2.transaction.Transactions;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.HttpResponder;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkTransactionHandler.class */
public final class SparkTransactionHandler extends AbstractHttpHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SparkTransactionHandler.class);
    private static final TransactionCodec TX_CODEC = new TransactionCodec();
    private static final TransactionInfo IMPLICIT_TX_INFO = new TransactionInfo() { // from class: co.cask.cdap.app.runtime.spark.SparkTransactionHandler.1
        @Override // co.cask.cdap.app.runtime.spark.TransactionInfo
        @Nullable
        public Transaction getTransaction() {
            return null;
        }

        @Override // co.cask.cdap.app.runtime.spark.TransactionInfo
        public boolean commitOnJobEnded() {
            return true;
        }

        @Override // co.cask.cdap.app.runtime.spark.TransactionInfo
        public void onJobStarted() {
        }

        @Override // co.cask.cdap.app.runtime.spark.TransactionInfo
        public void onTransactionCompleted(boolean z, @Nullable TransactionFailureException transactionFailureException) {
        }
    };
    private final TransactionSystemClient txClient;
    private final ConcurrentMap<Integer, Integer> stageToJob = new ConcurrentHashMap();
    private final ConcurrentMap<Integer, JobTransaction> jobTransactions = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkTransactionHandler$JobTransaction.class */
    public final class JobTransaction {
        private final Integer jobId;
        private final Set<Integer> stageIds;
        private final TransactionInfo transactionInfo;
        private volatile Optional<Transaction> transaction;

        JobTransaction(Integer num, Set<Integer> set, TransactionInfo transactionInfo) {
            this.jobId = num;
            this.stageIds = ImmutableSet.copyOf(set);
            this.transactionInfo = transactionInfo;
            Transaction transaction = transactionInfo.getTransaction();
            this.transaction = transaction == null ? null : Optional.of(transaction);
        }

        Integer getJobId() {
            return this.jobId;
        }

        Set<Integer> getStageIds() {
            return this.stageIds;
        }

        @Nullable
        public Transaction getTransaction() {
            Optional<Transaction> optional = this.transaction;
            if (optional == null) {
                synchronized (this) {
                    optional = this.transaction;
                    if (optional == null) {
                        try {
                            Optional<Transaction> of = Optional.of(SparkTransactionHandler.this.txClient.startLong());
                            this.transaction = of;
                            optional = of;
                        } catch (Throwable th) {
                            SparkTransactionHandler.LOG.error("Failed to start transaction for job {}", this.jobId, th);
                            Optional<Transaction> absent = Optional.absent();
                            this.transaction = absent;
                            optional = absent;
                        }
                    }
                }
            }
            return (Transaction) optional.orNull();
        }

        public void completed(boolean z) throws TransactionFailureException {
            Optional<Transaction> optional;
            if (this.transactionInfo.commitOnJobEnded() && (optional = this.transaction) != null && optional.isPresent()) {
                Transaction transaction = (Transaction) optional.get();
                try {
                    if (z) {
                        SparkTransactionHandler.LOG.debug("Committing transaction for job {}", this.jobId);
                        try {
                            SparkTransactionHandler.this.txClient.commitOrThrow(transaction);
                            this.transactionInfo.onTransactionCompleted(z, null);
                        } catch (Throwable th) {
                            Transactions.invalidateQuietly(SparkTransactionHandler.this.txClient, transaction);
                            throw th;
                        }
                    } else {
                        SparkTransactionHandler.LOG.debug("Invalidating transaction for job {}", this.jobId);
                        if (!SparkTransactionHandler.this.txClient.invalidate(transaction.getWritePointer())) {
                            throw new TransactionFailureException("Failed to invalid transaction on job failure. JobId: " + this.jobId + ", transaction: " + transaction);
                        }
                        this.transactionInfo.onTransactionCompleted(z, null);
                    }
                } catch (Throwable th2) {
                    TransactionFailureException asTransactionFailure = Transactions.asTransactionFailure(th2);
                    this.transactionInfo.onTransactionCompleted(z, asTransactionFailure);
                    throw asTransactionFailure;
                }
            }
        }

        public String toString() {
            return "JobTransaction{jobId=" + this.jobId + ", stageIds=" + this.stageIds + ", transaction=" + (this.transaction == null ? null : (Transaction) this.transaction.orNull()) + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkTransactionHandler(TransactionSystemClient transactionSystemClient) {
        this.txClient = transactionSystemClient;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void jobStarted(Integer num, Set<Integer> set) {
        jobStarted(num, set, IMPLICIT_TX_INFO);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void jobStarted(Integer num, Set<Integer> set, TransactionInfo transactionInfo) {
        JobTransaction jobTransaction = new JobTransaction(num, set, transactionInfo);
        LOG.debug("Spark job started: {}", jobTransaction);
        JobTransaction putIfAbsent = this.jobTransactions.putIfAbsent(num, jobTransaction);
        if (putIfAbsent != null) {
            LOG.error("Job already running: {}", putIfAbsent);
            return;
        }
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), num);
        }
        this.stageToJob.putAll(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void jobEnded(Integer num, boolean z) throws TransactionFailureException {
        JobTransaction remove = this.jobTransactions.remove(num);
        if (remove == null) {
            LOG.error("Transaction for job {} not found.", num);
            return;
        }
        LOG.debug("Spark job ended: {}", remove);
        this.stageToJob.keySet().removeAll(remove.getStageIds());
        remove.completed(z);
    }

    @GET
    @Path("/spark/stages/{stage}/transaction")
    public void getTransaction(HttpRequest httpRequest, HttpResponder httpResponder, @PathParam("stage") int i) {
        Integer num = this.stageToJob.get(Integer.valueOf(i));
        if (num == null) {
            httpResponder.sendString(HttpResponseStatus.NOT_FOUND, "JobId not found for stage " + i);
            return;
        }
        JobTransaction jobTransaction = this.jobTransactions.get(num);
        if (jobTransaction == null) {
            httpResponder.sendString(HttpResponseStatus.GONE, "No transaction associated with the stage " + i + " of job " + num);
            return;
        }
        Transaction transaction = jobTransaction.getTransaction();
        if (transaction == null) {
            httpResponder.sendString(HttpResponseStatus.GONE, "Failed to start transaction for stage " + i + " of job " + num);
            return;
        }
        try {
            httpResponder.sendByteArray(HttpResponseStatus.OK, TX_CODEC.encode(transaction), (Multimap) null);
        } catch (IOException e) {
            LOG.error("Failed to encode Transaction {}", jobTransaction, e);
            httpResponder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, "Failed to encode transaction: " + e.getMessage());
        }
    }
}
