package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.DatasetInstantiationException;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.mapreduce.MapReduceSpecification;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.app.metrics.MapReduceMetrics;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.data2.dataset2.DatasetCacheKey;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DynamicDatasetContext;
import co.cask.cdap.internal.app.runtime.adapter.PluginInstantiator;
import co.cask.cdap.templates.AdapterDefinition;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/DynamicMapReduceContext.class */
public class DynamicMapReduceContext extends BasicMapReduceContext implements DatasetContext {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicMapReduceContext.class);
    private final LoadingCache<Long, Map<DatasetCacheKey, Dataset>> datasetsCache;
    private final TransactionContext txContext;
    private final DynamicDatasetContext dynamicDatasetContext;

    public DynamicMapReduceContext(Program program, MapReduceMetrics.TaskType taskType, RunId runId, String str, Arguments arguments, MapReduceSpecification mapReduceSpecification, long j, @Nullable String str2, @Nullable WorkflowToken workflowToken, DiscoveryServiceClient discoveryServiceClient, MetricsCollectionService metricsCollectionService, TransactionSystemClient transactionSystemClient, DatasetFramework datasetFramework, @Nullable AdapterDefinition adapterDefinition, @Nullable PluginInstantiator pluginInstantiator) {
        super(program, taskType, runId, str, arguments, Collections.emptySet(), mapReduceSpecification, j, str2, workflowToken, discoveryServiceClient, metricsCollectionService, datasetFramework, adapterDefinition, pluginInstantiator);
        this.datasetsCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<Long, Map<DatasetCacheKey, Dataset>>() { // from class: co.cask.cdap.internal.app.runtime.batch.DynamicMapReduceContext.2
            @ParametersAreNonnullByDefault
            public void onRemoval(RemovalNotification<Long, Map<DatasetCacheKey, Dataset>> removalNotification) {
                if (removalNotification.getValue() != null) {
                    for (Map.Entry entry : ((Map) removalNotification.getValue()).entrySet()) {
                        try {
                            ((Dataset) entry.getValue()).close();
                        } catch (IOException e) {
                            DynamicMapReduceContext.LOG.error("Error closing dataset: {}", entry.getKey(), e);
                        }
                    }
                }
            }
        }).build(new CacheLoader<Long, Map<DatasetCacheKey, Dataset>>() { // from class: co.cask.cdap.internal.app.runtime.batch.DynamicMapReduceContext.1
            @ParametersAreNonnullByDefault
            public Map<DatasetCacheKey, Dataset> load(Long l) throws Exception {
                return Maps.newHashMap();
            }
        });
        this.txContext = new TransactionContext(transactionSystemClient, new TransactionAware[0]);
        this.dynamicDatasetContext = new DynamicDatasetContext(getProgram().getId().getNamespace(), this.txContext, getProgramMetrics(), datasetFramework, program.getClassLoader(), arguments.asMap(), null, getOwners()) { // from class: co.cask.cdap.internal.app.runtime.batch.DynamicMapReduceContext.3
            @Nullable
            protected LoadingCache<Long, Map<DatasetCacheKey, Dataset>> getDatasetsCache() {
                return DynamicMapReduceContext.this.datasetsCache;
            }
        };
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public synchronized <T extends Dataset> T getDataset(String str, Map<String, String> map) throws DatasetInstantiationException {
        return (T) this.dynamicDatasetContext.getDataset(str, map);
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public void close() {
        this.datasetsCache.invalidateAll();
        this.datasetsCache.cleanUp();
        super.close();
    }

    public TransactionContext getTransactionContext() {
        return this.txContext;
    }
}
