package co.cask.cdap.internal.app.runtime;

import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data.dataset.DatasetInstantiator;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.queue.QueueConsumer;
import co.cask.cdap.data2.queue.QueueProducer;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import co.cask.cdap.data2.transaction.stream.ForwardingStreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConsumerFactory;
import co.cask.cdap.proto.Id;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionContext;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionSystemClient;
import java.io.IOException;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/AbstractDataFabricFacade.class */
public abstract class AbstractDataFabricFacade implements DataFabricFacade {
    private final DatasetInstantiator dataSetContext;
    private final QueueClientFactory queueClientFactory;
    private final StreamConsumerFactory streamConsumerFactory;
    private final TransactionExecutorFactory txExecutorFactory;
    private final TransactionSystemClient txSystemClient;
    private final Id.Program programId;

    public AbstractDataFabricFacade(TransactionSystemClient transactionSystemClient, TransactionExecutorFactory transactionExecutorFactory, QueueClientFactory queueClientFactory, StreamConsumerFactory streamConsumerFactory, Program program, DatasetInstantiator datasetInstantiator) {
        this.txSystemClient = transactionSystemClient;
        this.queueClientFactory = queueClientFactory;
        this.streamConsumerFactory = streamConsumerFactory;
        this.txExecutorFactory = transactionExecutorFactory;
        this.dataSetContext = datasetInstantiator;
        this.programId = program.getId();
    }

    @Override // co.cask.cdap.internal.app.runtime.DataFabricFacade
    public DatasetContext getDataSetContext() {
        return this.dataSetContext;
    }

    @Override // co.cask.cdap.internal.app.runtime.DataFabricFacade
    public TransactionContext createTransactionManager() {
        return new TransactionContext(this.txSystemClient, this.dataSetContext.getTransactionAware());
    }

    @Override // co.cask.cdap.internal.app.runtime.DataFabricFacade
    public TransactionExecutor createTransactionExecutor() {
        return this.txExecutorFactory.createExecutor(this.dataSetContext.getTransactionAware());
    }

    public QueueProducer createProducer(QueueName queueName) throws IOException {
        return createProducer(queueName, QueueMetrics.NOOP_QUEUE_METRICS);
    }

    public QueueConsumer createConsumer(QueueName queueName, ConsumerConfig consumerConfig, int i) throws IOException {
        TransactionAware createConsumer = this.queueClientFactory.createConsumer(queueName, consumerConfig, i);
        if (createConsumer instanceof TransactionAware) {
            createConsumer = new CloseableQueueConsumer(this.dataSetContext, createConsumer);
            this.dataSetContext.addTransactionAware(createConsumer);
        }
        return createConsumer;
    }

    public QueueProducer createProducer(QueueName queueName, QueueMetrics queueMetrics) throws IOException {
        TransactionAware createProducer = this.queueClientFactory.createProducer(queueName, queueMetrics);
        if (createProducer instanceof TransactionAware) {
            this.dataSetContext.addTransactionAware(createProducer);
        }
        return createProducer;
    }

    @Override // co.cask.cdap.internal.app.runtime.DataFabricFacade
    public StreamConsumer createStreamConsumer(Id.Stream stream, ConsumerConfig consumerConfig) throws IOException {
        final StreamConsumer create = this.streamConsumerFactory.create(stream, String.format("%s.%s", this.programId.getApplicationId(), this.programId.getId()), consumerConfig);
        this.dataSetContext.addTransactionAware(create);
        return new ForwardingStreamConsumer(create) { // from class: co.cask.cdap.internal.app.runtime.AbstractDataFabricFacade.1
            public void close() throws IOException {
                super.close();
                AbstractDataFabricFacade.this.dataSetContext.removeTransactionAware(create);
            }
        };
    }
}
