package co.cask.cdap.data2.transaction.stream.hbase;

import co.cask.cdap.data.stream.StreamUtils;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.data2.transaction.stream.StreamConsumerStateStore;
import co.cask.cdap.data2.transaction.stream.StreamConsumerStateStoreFactory;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.proto.Id;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/hbase/HBaseStreamConsumerStateStoreFactory.class */
public final class HBaseStreamConsumerStateStoreFactory implements StreamConsumerStateStoreFactory {
    private final Configuration hConf;
    private final HBaseTableUtil tableUtil;

    @Inject
    HBaseStreamConsumerStateStoreFactory(Configuration configuration, HBaseTableUtil hBaseTableUtil) {
        this.hConf = configuration;
        this.tableUtil = hBaseTableUtil;
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerStateStoreFactory
    public synchronized StreamConsumerStateStore create(StreamConfig streamConfig) throws IOException {
        TableId stateStoreTableId = StreamUtils.getStateStoreTableId(streamConfig.getStreamId().getNamespace());
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        if (!this.tableUtil.tableExists(hBaseAdmin, stateStoreTableId)) {
            try {
                HTableDescriptor createHTableDescriptor = this.tableUtil.createHTableDescriptor(stateStoreTableId);
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(QueueEntryRow.COLUMN_FAMILY);
                createHTableDescriptor.addFamily(hColumnDescriptor);
                hColumnDescriptor.setMaxVersions(1);
                this.tableUtil.createTableIfNotExists(hBaseAdmin, stateStoreTableId, createHTableDescriptor, (byte[][]) null, 5000L, TimeUnit.MILLISECONDS);
                hBaseAdmin.close();
            } catch (Throwable th) {
                hBaseAdmin.close();
                throw th;
            }
        }
        HTable createHTable = this.tableUtil.createHTable(this.hConf, stateStoreTableId);
        createHTable.setWriteBufferSize(4194304L);
        createHTable.setAutoFlush(false);
        return new HBaseStreamConsumerStateStore(streamConfig, createHTable);
    }

    @Override // co.cask.cdap.data2.transaction.stream.StreamConsumerStateStoreFactory
    public synchronized void dropAllInNamespace(Id.Namespace namespace) throws IOException {
        HBaseAdmin hBaseAdmin = new HBaseAdmin(this.hConf);
        try {
            TableId stateStoreTableId = StreamUtils.getStateStoreTableId(namespace);
            if (this.tableUtil.tableExists(hBaseAdmin, stateStoreTableId)) {
                this.tableUtil.dropTable(hBaseAdmin, stateStoreTableId);
            }
        } finally {
            hBaseAdmin.close();
        }
    }
}
