package co.cask.cdap.app.runtime.spark;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetInstantiationException;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.data2.dataset2.DynamicDatasetCache;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.transaction.RetryingLongTransactionSystemClient;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkTransactional.class */
public final class SparkTransactional implements Transactional {
    static final String ACTIVE_TRANSACTION_KEY = "cdap.spark.active.transaction";
    private static final Logger LOG = LoggerFactory.getLogger(SparkTransactional.class);
    private final TransactionSystemClient txClient;
    private final DynamicDatasetCache datasetCache;
    private final ThreadLocal<TransactionalDatasetContext> activeDatasetContext = new ThreadLocal<TransactionalDatasetContext>() { // from class: co.cask.cdap.app.runtime.spark.SparkTransactional.1
        @Override // java.lang.ThreadLocal
        public void set(TransactionalDatasetContext transactionalDatasetContext) {
            String l = Long.toString(transactionalDatasetContext.getTransaction().getWritePointer());
            if (SparkRuntimeEnv.setLocalProperty(SparkTransactional.ACTIVE_TRANSACTION_KEY, l)) {
                SparkTransactional.this.transactionInfos.put(l, transactionalDatasetContext);
            }
            super.set((AnonymousClass1) transactionalDatasetContext);
        }

        @Override // java.lang.ThreadLocal
        public void remove() {
            String localProperty = SparkRuntimeEnv.getLocalProperty(SparkTransactional.ACTIVE_TRANSACTION_KEY);
            if (localProperty != null && !localProperty.isEmpty()) {
                SparkRuntimeEnv.setLocalProperty(SparkTransactional.ACTIVE_TRANSACTION_KEY, "");
                SparkTransactional.this.transactionInfos.remove(localProperty);
            }
            super.remove();
        }
    };
    private final Map<String, TransactionalDatasetContext> transactionInfos = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkTransactional$TransactionType.class */
    public enum TransactionType {
        EXPLICIT,
        IMPLICIT,
        IMPLICIT_COMMIT_ON_JOB_END
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkTransactional$TransactionalDatasetContext.class */
    public final class TransactionalDatasetContext implements SparkDatasetContext, TransactionInfo {
        private final Transaction transaction;
        private final DynamicDatasetCache datasetCache;
        private final Set<Dataset> datasets;
        private final Set<Dataset> discardDatasets;
        private final Iterable<TransactionAware> extraTxAwares;
        private TransactionType transactionType;
        private CountDownLatch completion;
        private volatile boolean jobStarted;

        private TransactionalDatasetContext(DynamicDatasetCache dynamicDatasetCache, TransactionType transactionType) throws TransactionFailureException {
            this.datasetCache = dynamicDatasetCache;
            this.datasets = Collections.synchronizedSet(new HashSet());
            this.discardDatasets = Collections.synchronizedSet(new HashSet());
            this.transactionType = transactionType;
            this.completion = new CountDownLatch(1);
            this.extraTxAwares = dynamicDatasetCache.getExtraTransactionAwares();
            this.transaction = startTx(this.extraTxAwares);
        }

        private Transaction startTx(Iterable<TransactionAware> iterable) throws TransactionFailureException {
            Transaction startLong = SparkTransactional.this.txClient.startLong();
            for (TransactionAware transactionAware : iterable) {
                try {
                    transactionAware.startTx(startLong);
                } catch (Throwable th) {
                    SparkTransactional.this.txClient.abort(startLong);
                    throw new TransactionFailureException(String.format("Unable to start transaction-aware '%s' for transaction %d. ", transactionAware.getTransactionAwareName(), Long.valueOf(startLong.getTransactionId())), th);
                }
            }
            return startLong;
        }

        boolean isJobStarted() {
            return this.jobStarted;
        }

        @Override // co.cask.cdap.app.runtime.spark.TransactionInfo
        @Nonnull
        public Transaction getTransaction() {
            return this.transaction;
        }

        @Override // co.cask.cdap.app.runtime.spark.TransactionInfo
        public boolean commitOnJobEnded() {
            return this.transactionType == TransactionType.IMPLICIT_COMMIT_ON_JOB_END;
        }

        @Override // co.cask.cdap.app.runtime.spark.TransactionInfo
        public void onJobStarted() {
            this.jobStarted = true;
        }

        @Override // co.cask.cdap.app.runtime.spark.TransactionInfo
        public void onTransactionCompleted(boolean z, @Nullable TransactionFailureException transactionFailureException) {
            Preconditions.checkState(commitOnJobEnded(), "Not expecting transaction to be completed");
            SparkTransactional.this.transactionInfos.remove(Long.toString(this.transaction.getWritePointer()));
            if (z && transactionFailureException == null) {
                postCommit();
            } else {
                rollbackWithoutFailure();
            }
            this.completion.countDown();
        }

        public <T extends Dataset> T getDataset(String str) throws DatasetInstantiationException {
            return (T) getDataset(str, Collections.emptyMap());
        }

        public <T extends Dataset> T getDataset(String str, String str2) throws DatasetInstantiationException {
            return (T) getDataset(str, str2, Collections.emptyMap());
        }

        public <T extends Dataset> T getDataset(String str, Map<String, String> map) throws DatasetInstantiationException {
            return (T) getDataset(str, map, AccessType.UNKNOWN);
        }

        public <T extends Dataset> T getDataset(String str, String str2, Map<String, String> map) throws DatasetInstantiationException {
            return (T) getDataset(str, str2, map, AccessType.UNKNOWN);
        }

        @Override // co.cask.cdap.app.runtime.spark.SparkDatasetContext
        public <T extends Dataset> T getDataset(String str, Map<String, String> map, AccessType accessType) throws DatasetInstantiationException {
            TransactionAware dataset = this.datasetCache.getDataset(str, map, accessType);
            if (this.datasets.add(dataset) && (dataset instanceof TransactionAware)) {
                dataset.startTx(this.transaction);
            }
            return dataset;
        }

        @Override // co.cask.cdap.app.runtime.spark.SparkDatasetContext
        public <T extends Dataset> T getDataset(String str, String str2, Map<String, String> map, AccessType accessType) throws DatasetInstantiationException {
            if (NamespaceId.SYSTEM.getNamespace().equalsIgnoreCase(str)) {
                throw new DatasetInstantiationException(String.format("Dataset %s cannot be instantiated from %s namespace. Cannot access %s namespace.", str2, NamespaceId.SYSTEM, NamespaceId.SYSTEM));
            }
            TransactionAware dataset = this.datasetCache.getDataset(str, str2, map, accessType);
            if (this.datasets.add(dataset) && (dataset instanceof TransactionAware)) {
                dataset.startTx(this.transaction);
            }
            return dataset;
        }

        public void releaseDataset(Dataset dataset) {
            discardDataset(dataset);
        }

        public void discardDataset(Dataset dataset) {
            this.discardDatasets.add(dataset);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flush() throws TransactionFailureException {
            for (TransactionAware transactionAware : getTransactionAwares()) {
                try {
                    if (!transactionAware.commitTx()) {
                        throw new TransactionFailureException("Failed to persist changes for " + transactionAware);
                    }
                } catch (Throwable th) {
                    throw Transactions.asTransactionFailure(th);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void postCommit() {
            for (TransactionAware transactionAware : getTransactionAwares()) {
                try {
                    transactionAware.postTxCommit();
                } catch (Exception e) {
                    SparkTransactional.LOG.warn("Exception raised in postTxCommit call on TransactionAware {}", transactionAware, e);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void rollbackWithoutFailure() {
            for (TransactionAware transactionAware : getTransactionAwares()) {
                try {
                    transactionAware.rollbackTx();
                } catch (Exception e) {
                    SparkTransactional.LOG.warn("Exception raised in rollback call on TransactionAware {}", transactionAware, e);
                }
            }
        }

        private Iterable<TransactionAware> getTransactionAwares() {
            return Iterables.concat(Iterables.filter(this.datasets, TransactionAware.class), this.extraTxAwares);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void discardDatasets() {
            Iterator<Dataset> it = this.discardDatasets.iterator();
            while (it.hasNext()) {
                this.datasetCache.discardDataset(it.next());
            }
            this.discardDatasets.clear();
            this.datasets.clear();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void awaitCompletion() throws InterruptedException {
            SparkTransactional.LOG.debug("Awaiting completion for {}", Long.valueOf(this.transaction.getWritePointer()));
            if (this.completion != null) {
                this.completion.await();
            }
            discardDatasets();
        }

        TransactionType getTransactionType() {
            return this.transactionType;
        }

        void setTransactionType(TransactionType transactionType) {
            this.transactionType = transactionType;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkTransactional(TransactionSystemClient transactionSystemClient, DynamicDatasetCache dynamicDatasetCache, RetryStrategy retryStrategy) {
        this.txClient = new RetryingLongTransactionSystemClient(transactionSystemClient, retryStrategy);
        this.datasetCache = dynamicDatasetCache;
    }

    public void execute(TxRunnable txRunnable) throws TransactionFailureException {
        execute(wrap(txRunnable), TransactionType.EXPLICIT);
    }

    public void execute(int i, TxRunnable txRunnable) throws TransactionFailureException {
        throw new TransactionFailureException("Transaction with explicit timeout is not supported in Spark");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public TransactionInfo getTransactionInfo(String str) {
        return this.transactionInfos.get(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void execute(SparkTxRunnable sparkTxRunnable, TransactionType transactionType) throws TransactionFailureException {
        TransactionalDatasetContext transactionalDatasetContext = this.activeDatasetContext.get();
        boolean z = false;
        if (transactionalDatasetContext != null) {
            TransactionType transactionType2 = transactionalDatasetContext.getTransactionType();
            if (transactionType2 == TransactionType.EXPLICIT && transactionType == TransactionType.EXPLICIT) {
                throw new TransactionFailureException("Nested transaction not supported. Active transaction is " + transactionalDatasetContext.getTransaction());
            }
            if (transactionType2 == TransactionType.IMPLICIT_COMMIT_ON_JOB_END) {
                if (transactionalDatasetContext.isJobStarted()) {
                    try {
                        transactionalDatasetContext.awaitCompletion();
                        transactionalDatasetContext = null;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                } else if (transactionType != TransactionType.IMPLICIT_COMMIT_ON_JOB_END) {
                    transactionalDatasetContext.setTransactionType(transactionType);
                    z = true;
                }
            }
        }
        if (transactionalDatasetContext == null) {
            transactionalDatasetContext = new TransactionalDatasetContext(this.datasetCache, transactionType);
            this.activeDatasetContext.set(transactionalDatasetContext);
            z = transactionType != TransactionType.IMPLICIT_COMMIT_ON_JOB_END;
        }
        Transaction transaction = transactionalDatasetContext.getTransaction();
        try {
            sparkTxRunnable.run(transactionalDatasetContext);
            transactionalDatasetContext.flush();
            if (z) {
                if (!this.txClient.commit(transaction)) {
                    throw new TransactionFailureException("Failed to commit explicit transaction " + transaction);
                }
                this.activeDatasetContext.remove();
                transactionalDatasetContext.postCommit();
                transactionalDatasetContext.discardDatasets();
            }
        } catch (Throwable th) {
            this.activeDatasetContext.remove();
            transactionalDatasetContext.rollbackWithoutFailure();
            Transactions.invalidateQuietly(this.txClient, transaction);
            throw Transactions.asTransactionFailure(th);
        }
    }

    private SparkTxRunnable wrap(final TxRunnable txRunnable) {
        return new SparkTxRunnable() { // from class: co.cask.cdap.app.runtime.spark.SparkTransactional.2
            @Override // co.cask.cdap.app.runtime.spark.SparkTxRunnable
            public void run(SparkDatasetContext sparkDatasetContext) throws Exception {
                txRunnable.run(sparkDatasetContext);
            }
        };
    }
}
