package co.cask.cdap.internal.app.runtime;

import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data.ProgramContext;
import co.cask.cdap.data.ProgramContextAware;
import co.cask.cdap.data2.dataset2.DynamicDatasetCache;
import co.cask.cdap.data2.metadata.lineage.AccessType;
import co.cask.cdap.data2.metadata.writer.LineageWriter;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.queue.QueueConsumer;
import co.cask.cdap.data2.queue.QueueProducer;
import co.cask.cdap.data2.transaction.TransactionExecutorFactory;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import co.cask.cdap.data2.transaction.stream.ForwardingStreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumerFactory;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.proto.id.StreamId;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
import org.apache.tephra.TransactionAware;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionExecutor;
import org.apache.tephra.TransactionFailureException;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/LineageWriterDataFabricFacade.class */
public final class LineageWriterDataFabricFacade implements DataFabricFacade, ProgramContextAware {
    private final DynamicDatasetCache datasetCache;
    private final QueueClientFactory queueClientFactory;
    private final StreamConsumerFactory streamConsumerFactory;
    private final TransactionExecutorFactory txExecutorFactory;
    private final ProgramId programId;
    private final LineageWriter lineageWriter;
    private volatile ProgramContext programContext;

    @Inject
    public LineageWriterDataFabricFacade(TransactionExecutorFactory transactionExecutorFactory, QueueClientFactory queueClientFactory, StreamConsumerFactory streamConsumerFactory, LineageWriter lineageWriter, @Assisted Program program, @Assisted DynamicDatasetCache dynamicDatasetCache) {
        this.queueClientFactory = queueClientFactory;
        this.streamConsumerFactory = streamConsumerFactory;
        this.txExecutorFactory = transactionExecutorFactory;
        this.datasetCache = dynamicDatasetCache;
        this.programId = program.getId();
        this.lineageWriter = lineageWriter;
    }

    public void setContext(ProgramContext programContext) {
        this.programContext = programContext;
        if (this.queueClientFactory instanceof ProgramContextAware) {
            this.queueClientFactory.setContext(programContext);
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.DataFabricFacade
    public DatasetContext getDatasetContext() {
        return this.datasetCache;
    }

    @Override // co.cask.cdap.internal.app.runtime.DataFabricFacade
    public TransactionContext createTransactionContext() throws TransactionFailureException {
        return this.datasetCache.newTransactionContext();
    }

    @Override // co.cask.cdap.internal.app.runtime.DataFabricFacade
    public TransactionExecutor createTransactionExecutor() {
        return this.txExecutorFactory.createExecutor(this.datasetCache);
    }

    public QueueProducer createProducer(QueueName queueName) throws IOException {
        return createProducer(queueName, QueueMetrics.NOOP_QUEUE_METRICS);
    }

    public QueueConsumer createConsumer(QueueName queueName, ConsumerConfig consumerConfig, int i) throws IOException {
        TransactionAware createConsumer = this.queueClientFactory.createConsumer(queueName, consumerConfig, i);
        if (createConsumer instanceof TransactionAware) {
            createConsumer = new CloseableQueueConsumer(this.datasetCache, createConsumer);
            this.datasetCache.addExtraTransactionAware(createConsumer);
        }
        return createConsumer;
    }

    public QueueProducer createProducer(QueueName queueName, QueueMetrics queueMetrics) throws IOException {
        TransactionAware createProducer = this.queueClientFactory.createProducer(queueName, queueMetrics);
        if (createProducer instanceof TransactionAware) {
            this.datasetCache.addExtraTransactionAware(createProducer);
        }
        return createProducer;
    }

    @Override // co.cask.cdap.internal.app.runtime.DataFabricFacade
    public StreamConsumer createStreamConsumer(StreamId streamId, ConsumerConfig consumerConfig) throws IOException {
        final StreamConsumer create = this.streamConsumerFactory.create(streamId, String.format("%s.%s", this.programId.getApplication(), this.programId.getProgram()), consumerConfig);
        this.datasetCache.addExtraTransactionAware(create);
        ProgramContext programContext = this.programContext;
        if (programContext != null) {
            this.lineageWriter.addAccess(programContext.getProgramRunId(), streamId, AccessType.READ, programContext.getComponentId());
        }
        return new ForwardingStreamConsumer(create) { // from class: co.cask.cdap.internal.app.runtime.LineageWriterDataFabricFacade.1
            public void close() throws IOException {
                super.close();
                LineageWriterDataFabricFacade.this.datasetCache.removeExtraTransactionAware(create);
            }
        };
    }
}
