package co.cask.cdap.data2.transaction.stream.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data.Namespace;
import co.cask.cdap.data2.datafabric.DefaultDatasetNamespace;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.data2.transaction.stream.StreamConsumerStateStore;
import co.cask.cdap.data2.transaction.stream.StreamConsumerStateStoreFactory;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/hbase/HBaseStreamConsumerStateStoreFactory.class */
public final class HBaseStreamConsumerStateStoreFactory implements StreamConsumerStateStoreFactory {
    private final Configuration hConf;
    private final String storeTableName;
    private final HBaseTableUtil tableUtil;
    private boolean tableCreated;

    @Inject
    HBaseStreamConsumerStateStoreFactory(Configuration configuration, CConfiguration cConfiguration, HBaseTableUtil hBaseTableUtil) {
        this.hConf = configuration;
        this.storeTableName = HBaseTableUtil.getHBaseTableName(new DefaultDatasetNamespace(cConfiguration, Namespace.SYSTEM).namespace("stream.state.store"));
        this.tableUtil = hBaseTableUtil;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerStateStoreFactory
    public synchronized StreamConsumerStateStore create(StreamConfig streamConfig) throws IOException {
        byte[] bytes = Bytes.toBytes(this.storeTableName);
        if (!this.tableCreated) {
            HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
            try {
                HTableDescriptor hTableDescriptor = new HTableDescriptor(bytes);
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(QueueEntryRow.COLUMN_FAMILY);
                hTableDescriptor.addFamily(hColumnDescriptor);
                hColumnDescriptor.setMaxVersions(1);
                this.tableUtil.createTableIfNotExists(hBaseAdmin, bytes, hTableDescriptor, (byte[][]) null, 5000L, TimeUnit.MILLISECONDS);
                this.tableCreated = true;
                hBaseAdmin.close();
            } catch (Throwable th) {
                hBaseAdmin.close();
                throw th;
            }
        }
        HTable hTable = new HTable(this.hConf, bytes);
        hTable.setWriteBufferSize(4194304L);
        hTable.setAutoFlush(false);
        return new HBaseStreamConsumerStateStore(streamConfig, hTable);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerStateStoreFactory
    public synchronized void dropAll() throws IOException {
        this.tableCreated = false;
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        try {
            byte[] bytes = Bytes.toBytes(this.storeTableName);
            if (hBaseAdmin.tableExists(bytes)) {
                hBaseAdmin.disableTable(bytes);
                hBaseAdmin.deleteTable(bytes);
            }
        } finally {
            hBaseAdmin.close();
        }
    }
}
