package co.cask.cdap.internal.app.runtime.worker;

import co.cask.cdap.api.TxRunnable;
import co.cask.cdap.api.data.stream.StreamBatchWriter;
import co.cask.cdap.api.data.stream.StreamWriter;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.security.store.SecureStore;
import co.cask.cdap.api.security.store.SecureStoreManager;
import co.cask.cdap.api.stream.StreamEventData;
import co.cask.cdap.api.worker.WorkerContext;
import co.cask.cdap.api.worker.WorkerSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.stream.StreamWriterFactory;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.internal.app.runtime.AbstractContext;
import co.cask.cdap.internal.app.runtime.plugin.PluginInstantiator;
import co.cask.cdap.logging.context.WorkerLoggingContext;
import co.cask.cdap.proto.Id;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.api.RunId;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/worker/BasicWorkerContext.class */
public final class BasicWorkerContext extends AbstractContext implements WorkerContext {
    private static final Logger LOG = LoggerFactory.getLogger(BasicWorkerContext.class);
    private final WorkerSpecification specification;
    private final int instanceId;
    private final LoggingContext loggingContext;
    private volatile int instanceCount;
    private final StreamWriter streamWriter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BasicWorkerContext(WorkerSpecification workerSpecification, Program program, ProgramOptions programOptions, int i, int i2, MetricsCollectionService metricsCollectionService, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, DiscoveryServiceClient discoveryServiceClient, StreamWriterFactory streamWriterFactory, @Nullable PluginInstantiator pluginInstantiator, SecureStore secureStore, SecureStoreManager secureStoreManager) {
        super(program, programOptions, workerSpecification.getDatasets(), datasetFramework, transactionSystemClient, discoveryServiceClient, true, metricsCollectionService, ImmutableMap.of("ins", String.valueOf(i)), secureStore, secureStoreManager, pluginInstantiator);
        this.specification = workerSpecification;
        this.instanceId = i;
        this.instanceCount = i2;
        this.loggingContext = createLoggingContext(program.getId(), getRunId());
        this.streamWriter = streamWriterFactory.create(new Id.Run(program.getId(), getRunId().getId()), getOwners());
    }

    private LoggingContext createLoggingContext(Id.Program program, RunId runId) {
        return new WorkerLoggingContext(program.getNamespaceId(), program.getApplicationId(), program.getId(), runId.getId(), String.valueOf(getInstanceId()));
    }

    public LoggingContext getLoggingContext() {
        return this.loggingContext;
    }

    public WorkerSpecification getSpecification() {
        return this.specification;
    }

    public void execute(TxRunnable txRunnable) {
        TransactionContext newTransactionContext = this.datasetCache.newTransactionContext();
        try {
            newTransactionContext.start();
            txRunnable.run(this.datasetCache);
            newTransactionContext.finish();
        } catch (TransactionFailureException e) {
            abortTransaction(e, "Failed to commit. Aborting transaction.", newTransactionContext);
        } catch (Exception e2) {
            abortTransaction(e2, "Exception occurred running user code. Aborting transaction.", newTransactionContext);
        }
    }

    public int getInstanceCount() {
        return this.instanceCount;
    }

    public int getInstanceId() {
        return this.instanceId;
    }

    public void setInstanceCount(int i) {
        this.instanceCount = i;
    }

    private void abortTransaction(Exception exc, String str, TransactionContext transactionContext) {
        try {
            LOG.error(str, exc);
            transactionContext.abort();
            throw Throwables.propagate(exc);
        } catch (TransactionFailureException e) {
            LOG.error("Failed to abort transaction.", e);
            throw Throwables.propagate(e);
        }
    }

    public void write(String str, String str2) throws IOException {
        this.streamWriter.write(str, str2);
    }

    public void write(String str, String str2, Map<String, String> map) throws IOException {
        this.streamWriter.write(str, str2, map);
    }

    public void write(String str, ByteBuffer byteBuffer) throws IOException {
        this.streamWriter.write(str, byteBuffer);
    }

    public void write(String str, StreamEventData streamEventData) throws IOException {
        this.streamWriter.write(str, streamEventData);
    }

    public void writeFile(String str, File file, String str2) throws IOException {
        this.streamWriter.writeFile(str, file, str2);
    }

    public StreamBatchWriter createBatchWriter(String str, String str2) throws IOException {
        return this.streamWriter.createBatchWriter(str, str2);
    }
}
