package co.cask.cdap.internal.app.runtime.service;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.service.ServiceWorkerContext;
import co.cask.cdap.api.service.ServiceWorkerSpecification;
import co.cask.cdap.app.metrics.ProgramUserMetrics;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.common.metrics.MetricsCollectionService;
import co.cask.cdap.common.metrics.MetricsCollector;
import co.cask.cdap.data2.dataset2.DatasetCacheKey;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DynamicDatasetContext;
import co.cask.cdap.internal.app.runtime.AbstractContext;
import co.cask.cdap.logging.context.UserServiceLoggingContext;
import co.cask.cdap.proto.Id;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/service/BasicServiceWorkerContext.class */
public class BasicServiceWorkerContext extends AbstractContext implements ServiceWorkerContext {
    private static final Logger LOG = LoggerFactory.getLogger(BasicServiceWorkerContext.class);
    private final ServiceWorkerSpecification specification;
    private final Set<String> datasets;
    private final TransactionSystemClient transactionSystemClient;
    private final DatasetFramework datasetFramework;
    private final Metrics userMetrics;
    private final int instanceId;
    private volatile int instanceCount;
    private final LoadingCache<Long, Map<DatasetCacheKey, Dataset>> datasetsCache;
    private final Program program;
    private final Map<String, String> runtimeArgs;

    public BasicServiceWorkerContext(ServiceWorkerSpecification serviceWorkerSpecification, Program program, RunId runId, int i, int i2, Arguments arguments, CConfiguration cConfiguration, MetricsCollectionService metricsCollectionService, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, DiscoveryServiceClient discoveryServiceClient) {
        super(program, runId, arguments, serviceWorkerSpecification.getDatasets(), getMetricCollector(metricsCollectionService, program, serviceWorkerSpecification.getName(), runId.getId(), i), datasetFramework, discoveryServiceClient);
        this.program = program;
        this.specification = serviceWorkerSpecification;
        this.datasets = ImmutableSet.copyOf(serviceWorkerSpecification.getDatasets());
        this.instanceId = i;
        this.instanceCount = i2;
        this.transactionSystemClient = transactionSystemClient;
        this.datasetFramework = datasetFramework;
        this.userMetrics = new ProgramUserMetrics(getMetricCollector(metricsCollectionService, program, serviceWorkerSpecification.getName(), runId.getId(), i));
        this.runtimeArgs = arguments.asMap();
        this.datasetsCache = CacheBuilder.newBuilder().expireAfterAccess(Math.max(2L, 2 * TimeUnit.SECONDS.toHours(cConfiguration.getInt("data.tx.timeout", 30))), TimeUnit.HOURS).removalListener(new RemovalListener<Long, Map<DatasetCacheKey, Dataset>>() { // from class: co.cask.cdap.internal.app.runtime.service.BasicServiceWorkerContext.2
            @ParametersAreNonnullByDefault
            public void onRemoval(RemovalNotification<Long, Map<DatasetCacheKey, Dataset>> removalNotification) {
                if (removalNotification.getValue() != null) {
                    for (Map.Entry entry : ((Map) removalNotification.getValue()).entrySet()) {
                        try {
                            ((Dataset) entry.getValue()).close();
                        } catch (IOException e) {
                            BasicServiceWorkerContext.LOG.error("Error closing dataset: {}", entry.getKey(), e);
                        }
                    }
                }
            }
        }).build(new CacheLoader<Long, Map<DatasetCacheKey, Dataset>>() { // from class: co.cask.cdap.internal.app.runtime.service.BasicServiceWorkerContext.1
            @ParametersAreNonnullByDefault
            public Map<DatasetCacheKey, Dataset> load(Long l) throws Exception {
                return Maps.newHashMap();
            }
        });
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public Metrics getMetrics() {
        return this.userMetrics;
    }

    public LoggingContext getLoggingContext() {
        return new UserServiceLoggingContext(this.program.getNamespaceId(), this.program.getApplicationId(), this.program.getId().getId(), this.specification.getName());
    }

    private static MetricsCollector getMetricCollector(MetricsCollectionService metricsCollectionService, Program program, String str, String str2, int i) {
        if (metricsCollectionService == null) {
            return null;
        }
        HashMap newHashMap = Maps.newHashMap(getMetricsContext(program, str2));
        newHashMap.put("srn", str);
        newHashMap.put("ins", String.valueOf(i));
        return metricsCollectionService.getCollector(newHashMap);
    }

    public ServiceWorkerSpecification getSpecification() {
        return this.specification;
    }

    public void execute(TxRunnable txRunnable) {
        TransactionContext transactionContext = new TransactionContext(this.transactionSystemClient, new TransactionAware[0]);
        try {
            transactionContext.start();
            txRunnable.run(new DynamicDatasetContext(Id.Namespace.from(this.program.getNamespaceId()), transactionContext, this.datasetFramework, getProgram().getClassLoader(), this.datasets, this.runtimeArgs) { // from class: co.cask.cdap.internal.app.runtime.service.BasicServiceWorkerContext.3
                protected LoadingCache<Long, Map<DatasetCacheKey, Dataset>> getDatasetsCache() {
                    return BasicServiceWorkerContext.this.datasetsCache;
                }
            });
            transactionContext.finish();
        } catch (TransactionFailureException e) {
            abortTransaction(e, "Failed to commit. Aborting transaction.", transactionContext);
        } catch (Exception e2) {
            abortTransaction(e2, "Exception occurred running user code. Aborting transaction.", transactionContext);
        }
    }

    public void execute(final co.cask.cdap.api.service.TxRunnable txRunnable) {
        execute(new TxRunnable() { // from class: co.cask.cdap.internal.app.runtime.service.BasicServiceWorkerContext.4
            public void run(DatasetContext datasetContext) throws Exception {
                txRunnable.run(datasetContext);
            }
        });
    }

    public int getInstanceCount() {
        return this.instanceCount;
    }

    public int getInstanceId() {
        return this.instanceId;
    }

    public void setInstanceCount(int i) {
        this.instanceCount = i;
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public void close() {
        super.close();
        this.datasetsCache.invalidateAll();
        this.datasetsCache.cleanUp();
    }

    private void abortTransaction(Exception exc, String str, TransactionContext transactionContext) {
        try {
            LOG.error(str, exc);
            transactionContext.abort();
            throw Throwables.propagate(exc);
        } catch (TransactionFailureException e) {
            LOG.error("Failed to abort transaction.", e);
            throw Throwables.propagate(e);
        }
    }
}
