package co.cask.cdap.internal.app.runtime.worker;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.stream.StreamBatchWriter;
import co.cask.cdap.api.data.stream.StreamWriter;
import co.cask.cdap.api.dataset.Dataset;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsCollector;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.api.templates.AdapterSpecification;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.cdap.api.worker.WorkerSpecification;
import co.cask.cdap.app.metrics.ProgramUserMetrics;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.stream.StreamWriterFactory;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.data2.dataset2.DatasetCacheKey;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DynamicDatasetContext;
import co.cask.cdap.internal.app.runtime.AbstractContext;
import co.cask.cdap.internal.app.runtime.adapter.PluginInstantiator;
import co.cask.cdap.logging.context.WorkerLoggingContext;
import co.cask.cdap.proto.Id;
import co.cask.cdap.templates.AdapterDefinition;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/worker/BasicWorkerContext.class */
public class BasicWorkerContext extends AbstractContext implements WorkerContext {
    private static final Logger LOG = LoggerFactory.getLogger(BasicWorkerContext.class);
    private final WorkerSpecification specification;
    private final TransactionSystemClient transactionSystemClient;
    private final DatasetFramework datasetFramework;
    private final Metrics userMetrics;
    private final int instanceId;
    private final LoggingContext loggingContext;
    private volatile int instanceCount;
    private final LoadingCache<Long, Map<DatasetCacheKey, Dataset>> datasetsCache;
    private final Program program;
    private final Map<String, String> runtimeArgs;
    private final StreamWriter streamWriter;

    public BasicWorkerContext(WorkerSpecification workerSpecification, Program program, RunId runId, int i, int i2, Arguments arguments, CConfiguration cConfiguration, MetricsCollectionService metricsCollectionService, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, DiscoveryServiceClient discoveryServiceClient, StreamWriterFactory streamWriterFactory, @Nullable AdapterDefinition adapterDefinition, @Nullable PluginInstantiator pluginInstantiator) {
        super(program, runId, arguments, workerSpecification.getDatasets(), getMetricCollector(program, runId.getId(), i, metricsCollectionService, adapterDefinition), datasetFramework, discoveryServiceClient, adapterDefinition, pluginInstantiator);
        this.program = program;
        this.specification = workerSpecification;
        this.instanceId = i;
        this.instanceCount = i2;
        this.transactionSystemClient = transactionSystemClient;
        this.datasetFramework = datasetFramework;
        this.loggingContext = createLoggingContext(program.getId(), runId, adapterDefinition);
        if (metricsCollectionService != null) {
            this.userMetrics = new ProgramUserMetrics(getProgramMetrics());
        } else {
            this.userMetrics = null;
        }
        this.runtimeArgs = arguments.asMap();
        this.streamWriter = streamWriterFactory.create(program.getId().getNamespace(), getOwners());
        this.datasetsCache = CacheBuilder.newBuilder().expireAfterAccess(Math.max(2L, 2 * TimeUnit.SECONDS.toHours(cConfiguration.getInt("data.tx.timeout", 30))), TimeUnit.HOURS).removalListener(new RemovalListener<Long, Map<DatasetCacheKey, Dataset>>() { // from class: co.cask.cdap.internal.app.runtime.worker.BasicWorkerContext.2
            @ParametersAreNonnullByDefault
            public void onRemoval(RemovalNotification<Long, Map<DatasetCacheKey, Dataset>> removalNotification) {
                if (removalNotification.getValue() != null) {
                    for (Map.Entry entry : ((Map) removalNotification.getValue()).entrySet()) {
                        try {
                            ((Dataset) entry.getValue()).close();
                        } catch (IOException e) {
                            BasicWorkerContext.LOG.error("Error closing dataset: {}", entry.getKey(), e);
                        }
                    }
                }
            }
        }).build(new CacheLoader<Long, Map<DatasetCacheKey, Dataset>>() { // from class: co.cask.cdap.internal.app.runtime.worker.BasicWorkerContext.1
            @ParametersAreNonnullByDefault
            public Map<DatasetCacheKey, Dataset> load(Long l) throws Exception {
                return Maps.newHashMap();
            }
        });
    }

    private LoggingContext createLoggingContext(Id.Program program, RunId runId, @Nullable AdapterSpecification adapterSpecification) {
        return new WorkerLoggingContext(program.getNamespaceId(), program.getApplicationId(), program.getId(), runId.getId(), String.valueOf(getInstanceId()), adapterSpecification == null ? null : adapterSpecification.getName());
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public Metrics getMetrics() {
        return this.userMetrics;
    }

    public LoggingContext getLoggingContext() {
        return this.loggingContext;
    }

    @Nullable
    private static MetricsCollector getMetricCollector(Program program, String str, int i, @Nullable MetricsCollectionService metricsCollectionService, @Nullable AdapterSpecification adapterSpecification) {
        if (metricsCollectionService == null) {
            return null;
        }
        HashMap newHashMap = Maps.newHashMap(getMetricsContext(program, str));
        newHashMap.put("ins", String.valueOf(i));
        if (adapterSpecification != null) {
            newHashMap.put("adp", adapterSpecification.getName());
        }
        return metricsCollectionService.getCollector(newHashMap);
    }

    public WorkerSpecification getSpecification() {
        return this.specification;
    }

    public void execute(TxRunnable txRunnable) {
        TransactionContext transactionContext = new TransactionContext(this.transactionSystemClient, new TransactionAware[0]);
        try {
            transactionContext.start();
            txRunnable.run(new DynamicDatasetContext(Id.Namespace.from(this.program.getNamespaceId()), transactionContext, this.datasetFramework, getProgram().getClassLoader(), this.runtimeArgs, null, getOwners()) { // from class: co.cask.cdap.internal.app.runtime.worker.BasicWorkerContext.3
                protected LoadingCache<Long, Map<DatasetCacheKey, Dataset>> getDatasetsCache() {
                    return BasicWorkerContext.this.datasetsCache;
                }
            });
            transactionContext.finish();
        } catch (TransactionFailureException e) {
            abortTransaction(e, "Failed to commit. Aborting transaction.", transactionContext);
        } catch (Exception e2) {
            abortTransaction(e2, "Exception occurred running user code. Aborting transaction.", transactionContext);
        }
    }

    public int getInstanceCount() {
        return this.instanceCount;
    }

    public int getInstanceId() {
        return this.instanceId;
    }

    public void setInstanceCount(int i) {
        this.instanceCount = i;
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractContext
    public void close() {
        super.close();
        this.datasetsCache.invalidateAll();
        this.datasetsCache.cleanUp();
    }

    private void abortTransaction(Exception exc, String str, TransactionContext transactionContext) {
        try {
            LOG.error(str, exc);
            transactionContext.abort();
            throw Throwables.propagate(exc);
        } catch (TransactionFailureException e) {
            LOG.error("Failed to abort transaction.", e);
            throw Throwables.propagate(e);
        }
    }

    public void write(String str, String str2) throws IOException {
        this.streamWriter.write(str, str2);
    }

    public void write(String str, String str2, Map<String, String> map) throws IOException {
        this.streamWriter.write(str, str2, map);
    }

    public void write(String str, ByteBuffer byteBuffer) throws IOException {
        this.streamWriter.write(str, byteBuffer);
    }

    public void write(String str, StreamEventData streamEventData) throws IOException {
        this.streamWriter.write(str, streamEventData);
    }

    public void writeFile(String str, File file, String str2) throws IOException {
        this.streamWriter.writeFile(str, file, str2);
    }

    public StreamBatchWriter createBatchWriter(String str, String str2) throws IOException {
        return this.streamWriter.createBatchWriter(str, str2);
    }
}
