package co.cask.cdap.internal.app.runtime.service;

import co.cask.cdap.api.data.DataSetContext;
import co.cask.cdap.api.data.DataSetInstantiationException;
import co.cask.cdap.api.dataset.DatasetDefinition;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.service.ServiceWorkerContext;
import co.cask.cdap.api.service.TxRunnable;
import co.cask.cdap.app.metrics.ServiceRunnableMetrics;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.metrics.MetricsCollectionService;
import co.cask.cdap.data.Namespace;
import co.cask.cdap.data2.datafabric.DefaultDatasetNamespace;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DatasetManagementException;
import co.cask.cdap.data2.dataset2.NamespacedDatasetFramework;
import co.cask.cdap.internal.app.program.TypeId;
import co.cask.cdap.internal.app.runtime.AbstractContext;
import co.cask.cdap.proto.ProgramType;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/service/BasicServiceWorkerContext.class */
public class BasicServiceWorkerContext extends AbstractContext implements ServiceWorkerContext {
    private static final Logger LOG = LoggerFactory.getLogger(BasicServiceWorkerContext.class);
    private final Map<String, String> runtimeArgs;
    private final Set<String> datasets;
    private final TransactionSystemClient transactionSystemClient;
    private final DatasetFramework datasetFramework;
    private final ClassLoader programClassLoader;
    private final ServiceRunnableMetrics serviceRunnableMetrics;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/service/BasicServiceWorkerContext$ServiceWorkerDatasetContext.class */
    private class ServiceWorkerDatasetContext implements DataSetContext {
        private final TransactionContext context;

        private ServiceWorkerDatasetContext(TransactionContext transactionContext) {
            this.context = transactionContext;
        }

        public <T extends Closeable> T getDataSet(String str) throws DataSetInstantiationException {
            return (T) getDataSet(str, DatasetDefinition.NO_ARGUMENTS);
        }

        public <T extends Closeable> T getDataSet(String str, Map<String, String> map) throws DataSetInstantiationException {
            Preconditions.checkArgument(BasicServiceWorkerContext.this.datasets.contains(str), String.format("Trying to access dataset %s that is not declared as used by the Worker. Specificy datasets used using useDataset() method in the Workers's configure.", str));
            try {
                TransactionAware dataset = BasicServiceWorkerContext.this.datasetFramework.getDataset(str, map, BasicServiceWorkerContext.this.programClassLoader);
                if (dataset == null) {
                    throw new DataSetInstantiationException(String.format("Dataset %s does not exist.", str));
                }
                this.context.addTransactionAware(dataset);
                return dataset;
            } catch (IOException e) {
                BasicServiceWorkerContext.LOG.error("Could not instantiate dataset.");
                throw Throwables.propagate(e);
            } catch (DatasetManagementException e2) {
                BasicServiceWorkerContext.LOG.error("Could not get dataset metainfo.");
                throw Throwables.propagate(e2);
            }
        }
    }

    public BasicServiceWorkerContext(Program program, RunId runId, int i, String str, ClassLoader classLoader, CConfiguration cConfiguration, Map<String, String> map, Set<String> set, MetricsCollectionService metricsCollectionService, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, DiscoveryServiceClient discoveryServiceClient) {
        super(program, runId, set, getMetricContext(program, str, i), metricsCollectionService, datasetFramework, cConfiguration, discoveryServiceClient);
        this.programClassLoader = classLoader;
        this.runtimeArgs = ImmutableMap.copyOf(map);
        this.datasets = ImmutableSet.copyOf(set);
        this.transactionSystemClient = transactionSystemClient;
        this.datasetFramework = new NamespacedDatasetFramework(datasetFramework, new DefaultDatasetNamespace(cConfiguration, Namespace.USER));
        this.serviceRunnableMetrics = new ServiceRunnableMetrics(metricsCollectionService, getMetricContext(program, str, i));
    }

    public Map<String, String> getRuntimeArguments() {
        return this.runtimeArgs;
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public Metrics getMetrics() {
        return this.serviceRunnableMetrics;
    }

    private static String getMetricContext(Program program, String str, int i) {
        return String.format("%s.%s.%s.%s.%s", program.getApplicationId(), TypeId.getMetricContextId(ProgramType.SERVICE), program.getName(), str, Integer.valueOf(i));
    }

    public void execute(TxRunnable txRunnable) {
        TransactionContext transactionContext = new TransactionContext(this.transactionSystemClient, new TransactionAware[0]);
        try {
            transactionContext.start();
            txRunnable.run(new ServiceWorkerDatasetContext(transactionContext));
            transactionContext.finish();
        } catch (TransactionFailureException e) {
            abortTransaction(e, "Failed to commit. Aborting transaction.", transactionContext);
        } catch (Exception e2) {
            abortTransaction(e2, "Exception occurred running user code. Aborting transaction.", transactionContext);
        }
    }

    private void abortTransaction(Exception exc, String str, TransactionContext transactionContext) {
        try {
            LOG.error(str);
            transactionContext.abort();
            throw Throwables.propagate(exc);
        } catch (TransactionFailureException e) {
            LOG.error("Failed to abort transaction.");
            throw Throwables.propagate(e);
        }
    }
}
