package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.queue.QueueConsumer;
import co.cask.cdap.data2.queue.QueueProducer;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import com.google.inject.Inject;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueClientFactory.class */
public final class HBaseQueueClientFactory implements QueueClientFactory {
    private static final int DEFAULT_WRITE_BUFFER_SIZE = 4194304;
    private final Configuration hConf;
    private final HBaseQueueAdmin queueAdmin;
    private final HBaseStreamAdmin streamAdmin;
    private final HBaseQueueUtil queueUtil = new HBaseQueueUtilFactory().get();

    @Inject
    public HBaseQueueClientFactory(Configuration configuration, QueueAdmin queueAdmin, HBaseStreamAdmin hBaseStreamAdmin) {
        this.hConf = configuration;
        this.queueAdmin = (HBaseQueueAdmin) queueAdmin;
        this.streamAdmin = hBaseStreamAdmin;
    }

    String getTableName(QueueName queueName) {
        return (queueName.isStream() ? this.streamAdmin : this.queueAdmin).getActualTableName(queueName);
    }

    String getConfigTableName(QueueName queueName) {
        return (queueName.isStream() ? this.streamAdmin : this.queueAdmin).getConfigTableName();
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueConsumer createConsumer(QueueName queueName, ConsumerConfig consumerConfig, int i) throws IOException {
        HBaseQueueAdmin ensureTableExists = ensureTableExists(queueName);
        HBaseConsumerStateStore hBaseConsumerStateStore = new HBaseConsumerStateStore(queueName, consumerConfig, createHTable(ensureTableExists.getConfigTableName()));
        return this.queueUtil.getQueueConsumer(consumerConfig, createHTable(ensureTableExists.getActualTableName(queueName)), queueName, hBaseConsumerStateStore.getState(), hBaseConsumerStateStore);
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueProducer createProducer(QueueName queueName) throws IOException {
        return createProducer(queueName, QueueMetrics.NOOP_QUEUE_METRICS);
    }

    @Override // co.cask.cdap.data2.queue.QueueClientFactory
    public QueueProducer createProducer(QueueName queueName, QueueMetrics queueMetrics) throws IOException {
        return new HBaseQueueProducer(createHTable(ensureTableExists(queueName).getActualTableName(queueName)), queueName, queueMetrics);
    }

    private HBaseQueueAdmin ensureTableExists(QueueName queueName) throws IOException {
        HBaseQueueAdmin hBaseQueueAdmin = queueName.isStream() ? this.streamAdmin : this.queueAdmin;
        try {
            if (!hBaseQueueAdmin.exists(queueName)) {
                hBaseQueueAdmin.create(queueName);
            }
            return hBaseQueueAdmin;
        } catch (Exception e) {
            throw new IOException("Failed to open table " + hBaseQueueAdmin.getActualTableName(queueName), e);
        }
    }

    private HTable createHTable(String str) throws IOException {
        HTable hTable = new HTable(this.hConf, str);
        hTable.setWriteBufferSize(4194304L);
        hTable.setAutoFlush(false);
        return hTable;
    }
}
