package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.datafabric.dataset.DatasetsUtil;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.DatasetManagementException;
import co.cask.cdap.data2.dataset2.lib.hbase.AbstractHBaseDataSetAdmin;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.queue.AbstractQueueAdmin;
import co.cask.cdap.data2.transaction.queue.QueueConfigurer;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.hbase.wd.RowKeyDistributorByHashPrefix;
import co.cask.cdap.proto.Id;
import co.cask.tephra.TransactionAware;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueAdmin.class */
public class HBaseQueueAdmin extends AbstractQueueAdmin {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseQueueAdmin.class);
    public static final String PROPERTY_PREFIX_BYTES = "cdap.prefix.bytes";
    protected final HBaseTableUtil tableUtil;
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final LocationFactory locationFactory;
    private final QueueConstants.QueueType type;
    private final TransactionExecutorFactory txExecutorFactory;
    private final DatasetFramework datasetFramework;
    private HBaseAdmin admin;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueAdmin$DatasetAdmin.class */
    public final class DatasetAdmin extends AbstractHBaseDataSetAdmin {
        private final Properties properties;

        private DatasetAdmin(TableId tableId, Configuration configuration, HBaseTableUtil hBaseTableUtil, Properties properties) {
            super(tableId, configuration, hBaseTableUtil);
            this.properties = properties;
        }

        @Override // co.cask.cdap.data2.dataset2.lib.hbase.AbstractHBaseDataSetAdmin
        protected AbstractHBaseDataSetAdmin.CoprocessorJar createCoprocessorJar() throws IOException {
            List<? extends Class<? extends Coprocessor>> coprocessors = HBaseQueueAdmin.this.getCoprocessors();
            if (coprocessors.isEmpty()) {
                return AbstractHBaseDataSetAdmin.CoprocessorJar.EMPTY;
            }
            return new AbstractHBaseDataSetAdmin.CoprocessorJar(coprocessors, HBaseTableUtil.createCoProcessorJar(HBaseQueueAdmin.this.type.toString(), HBaseQueueAdmin.this.locationFactory.create(HBaseQueueAdmin.this.cConf.get(QueueConstants.ConfigKeys.QUEUE_TABLE_COPROCESSOR_DIR, QueueConstants.DEFAULT_QUEUE_TABLE_COPROCESSOR_DIR)), coprocessors));
        }

        @Override // co.cask.cdap.data2.dataset2.lib.hbase.AbstractHBaseDataSetAdmin
        protected boolean upgradeTable(HTableDescriptor hTableDescriptor) {
            boolean z = false;
            HColumnDescriptor family = hTableDescriptor.getFamily(QueueEntryRow.COLUMN_FAMILY);
            if (family.getMaxVersions() != 1) {
                family.setMaxVersions(1);
                z = true;
            }
            for (String str : this.properties.stringPropertyNames()) {
                String value = hTableDescriptor.getValue(str);
                String property = this.properties.getProperty(str);
                if (!Objects.equal(value, property)) {
                    hTableDescriptor.setValue(str, property);
                    z = true;
                }
            }
            return z;
        }

        public void create() throws IOException {
            HTableDescriptor createHTableDescriptor = this.tableUtil.createHTableDescriptor(this.tableId);
            for (String str : this.properties.stringPropertyNames()) {
                createHTableDescriptor.setValue(str, this.properties.getProperty(str));
            }
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(QueueEntryRow.COLUMN_FAMILY);
            hColumnDescriptor.setMaxVersions(1);
            createHTableDescriptor.addFamily(hColumnDescriptor);
            AbstractHBaseDataSetAdmin.CoprocessorJar createCoprocessorJar = createCoprocessorJar();
            for (Class<? extends Coprocessor> cls : createCoprocessorJar.getCoprocessors()) {
                addCoprocessor(createHTableDescriptor, cls, createCoprocessorJar.getJarLocation(), createCoprocessorJar.getPriority(cls));
            }
            int i = HBaseQueueAdmin.this.cConf.getInt(QueueConstants.ConfigKeys.QUEUE_TABLE_PRESPLITS);
            byte[][] splitKeys = HBaseTableUtil.getSplitKeys(i, i, new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(i)));
            createHTableDescriptor.setValue(QueueConstants.DISTRIBUTOR_BUCKETS, Integer.toString(i));
            createQueueTable(this.tableId, createHTableDescriptor, splitKeys);
        }

        private void createQueueTable(TableId tableId, HTableDescriptor hTableDescriptor, byte[][] bArr) throws IOException {
            hTableDescriptor.setValue(HBaseQueueAdmin.PROPERTY_PREFIX_BYTES, Integer.toString(HBaseQueueAdmin.this.type == QueueConstants.QueueType.SHARDED_QUEUE ? 13 : 1));
            HBaseQueueAdmin.LOG.info("Create queue table with prefix bytes {}", hTableDescriptor.getValue(HBaseQueueAdmin.PROPERTY_PREFIX_BYTES));
            this.tableUtil.createTableIfNotExists(HBaseQueueAdmin.this.getHBaseAdmin(), tableId, hTableDescriptor, bArr);
        }
    }

    @Inject
    public HBaseQueueAdmin(Configuration configuration, CConfiguration cConfiguration, LocationFactory locationFactory, HBaseTableUtil hBaseTableUtil, DatasetFramework datasetFramework, TransactionExecutorFactory transactionExecutorFactory) {
        this(configuration, cConfiguration, locationFactory, hBaseTableUtil, datasetFramework, transactionExecutorFactory, QueueConstants.QueueType.SHARDED_QUEUE);
    }

    protected HBaseQueueAdmin(Configuration configuration, CConfiguration cConfiguration, LocationFactory locationFactory, HBaseTableUtil hBaseTableUtil, DatasetFramework datasetFramework, TransactionExecutorFactory transactionExecutorFactory, QueueConstants.QueueType queueType) {
        super(queueType);
        this.hConf = configuration;
        this.cConf = cConfiguration;
        this.locationFactory = locationFactory;
        this.tableUtil = hBaseTableUtil;
        this.txExecutorFactory = transactionExecutorFactory;
        this.datasetFramework = datasetFramework;
        this.type = queueType;
    }

    public static TableId getConfigTableId(QueueName queueName) {
        return getConfigTableId(queueName.getFirstComponent());
    }

    public static TableId getConfigTableId(String str) {
        return TableId.from(str, HBaseQueueDatasetModule.STATE_STORE_NAME + ".config");
    }

    protected final synchronized HBaseAdmin getHBaseAdmin() throws IOException {
        if (this.admin == null) {
            this.admin = new HBaseAdmin(this.hConf);
        }
        return this.admin;
    }

    public boolean doTruncateTable(QueueName queueName) {
        return true;
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public boolean exists(QueueName queueName) throws Exception {
        return this.tableUtil.tableExists(getHBaseAdmin(), getDataTableId(queueName)) && this.datasetFramework.hasInstance(getStateStoreId(queueName.getFirstComponent()));
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void create(QueueName queueName) throws IOException {
        create(queueName, new Properties());
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void create(QueueName queueName, Properties properties) throws IOException {
        createStateStoreDataset(queueName.getFirstComponent());
        DatasetAdmin datasetAdmin = new DatasetAdmin(getDataTableId(queueName), this.hConf, this.tableUtil, properties);
        try {
            datasetAdmin.create();
            datasetAdmin.close();
        } catch (Throwable th) {
            datasetAdmin.close();
            throw th;
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void truncate(QueueName queueName) throws Exception {
        if (!doTruncateTable(queueName)) {
            LOG.warn("truncate({}) on HBase queue table has no effect.", queueName);
        } else {
            truncate(getDataTableId(queueName));
            deleteConsumerStates(queueName);
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void clearAllForFlow(String str, String str2, String str3) throws Exception {
        truncate(getDataTableId(str, str2, str3));
        deleteFlowConfigs(str, str2, str3);
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public QueueConfigurer getQueueConfigurer(QueueName queueName) throws Exception {
        if (!exists(queueName)) {
            create(queueName);
        }
        return getConsumerStateStore(queueName);
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void dropAllForFlow(String str, String str2, String str3) throws Exception {
        drop(getDataTableId(str, str2, str3));
        deleteFlowConfigs(str, str2, str3);
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void upgrade() throws Exception {
        List<TableId> listTables = this.tableUtil.listTables(getHBaseAdmin());
        ArrayList<TableId> newArrayList = Lists.newArrayList();
        for (TableId tableId : listTables) {
            if (isDataTable(tableId)) {
                LOG.info("Upgrading queue table: {}", tableId);
                Properties properties = new Properties();
                if (this.tableUtil.getHTableDescriptor(getHBaseAdmin(), tableId).getValue(PROPERTY_PREFIX_BYTES) == null) {
                    properties.setProperty(PROPERTY_PREFIX_BYTES, Integer.toString(1));
                }
                upgrade(tableId, properties);
                LOG.info("Upgraded queue table: {}", tableId);
            } else if (isStateStoreTable(tableId)) {
                newArrayList.add(tableId);
            }
        }
        for (TableId tableId2 : newArrayList) {
            LOG.info("Upgrading queue state store: {}", tableId2);
            Id.DatasetInstance createStateStoreDataset = createStateStoreDataset(tableId2.getNamespace().getId());
            co.cask.cdap.api.dataset.DatasetAdmin admin = this.datasetFramework.getAdmin(createStateStoreDataset, null);
            if (admin == null) {
                LOG.error("No dataset admin available for {}", createStateStoreDataset);
            } else {
                admin.upgrade();
                LOG.info("Upgraded queue state store: {}", tableId2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueConstants.QueueType getType() {
        return this.type;
    }

    public HBaseConsumerStateStore getConsumerStateStore(QueueName queueName) throws Exception {
        HBaseConsumerStateStore dataset = this.datasetFramework.getDataset(getStateStoreId(queueName.getFirstComponent()), ImmutableMap.of("queue.name", queueName.toString()), null);
        if (dataset == null) {
            throw new IllegalStateException("Consumer state store not exists for " + queueName);
        }
        return dataset;
    }

    private Id.DatasetInstance getStateStoreId(String str) {
        return Id.DatasetInstance.from(str, HBaseQueueDatasetModule.STATE_STORE_NAME);
    }

    private Id.DatasetInstance createStateStoreDataset(String str) throws IOException {
        try {
            Id.DatasetInstance stateStoreId = getStateStoreId(str);
            DatasetsUtil.createIfNotExists(this.datasetFramework, stateStoreId, HBaseQueueDatasetModule.STATE_STORE_TYPE_NAME, DatasetProperties.builder().add("dataset.table.column.family", Bytes.toString(QueueEntryRow.COLUMN_FAMILY)).build());
            return stateStoreId;
        } catch (DatasetManagementException e) {
            throw new IOException(e);
        }
    }

    private void truncate(TableId tableId) throws IOException {
        if (this.tableUtil.tableExists(getHBaseAdmin(), tableId)) {
            this.tableUtil.truncateTable(getHBaseAdmin(), tableId);
        }
    }

    private void drop(TableId tableId) throws IOException {
        if (this.tableUtil.tableExists(getHBaseAdmin(), tableId)) {
            this.tableUtil.dropTable(getHBaseAdmin(), tableId);
        }
    }

    private void deleteConsumerStates(QueueName queueName) throws Exception {
        if (this.datasetFramework.hasInstance(getStateStoreId(queueName.getFirstComponent()))) {
            final HBaseConsumerStateStore consumerStateStore = getConsumerStateStore(queueName);
            Transactions.createTransactionExecutor(this.txExecutorFactory, consumerStateStore).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueAdmin.1
                public void apply() throws Exception {
                    consumerStateStore.clear();
                }
            });
        }
    }

    private void deleteFlowConfigs(String str, String str2, String str3) throws Exception {
        final QueueName from = QueueName.from(URI.create(QueueName.prefixForFlow(str, str2, str3)));
        HBaseConsumerStateStore dataset = this.datasetFramework.getDataset(getStateStoreId(str), ImmutableMap.of("queue.name", from.toString()), null);
        if (dataset == null) {
            return;
        }
        final TransactionAware internalTable = dataset.getInternalTable();
        Transactions.createTransactionExecutor(this.txExecutorFactory, internalTable).execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueAdmin.2
            public void apply() throws Exception {
                byte[] bytes = Bytes.toBytes(from.toString());
                Scanner scan = internalTable.scan(bytes, Bytes.stopKeyForPrefix(bytes));
                try {
                    for (Row next = scan.next(); next != null; next = scan.next()) {
                        internalTable.delete(next.getRow());
                    }
                } finally {
                    scan.close();
                }
            }
        });
    }

    protected List<? extends Class<? extends Coprocessor>> getCoprocessors() {
        return ImmutableList.of(this.tableUtil.getQueueRegionObserverClassForVersion(), this.tableUtil.getDequeueScanObserverClassForVersion());
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void dropAllInNamespace(String str) throws Exception {
        Iterator it = EnumSet.of(QueueConstants.QueueType.QUEUE, QueueConstants.QueueType.SHARDED_QUEUE).iterator();
        while (it.hasNext()) {
            final String format = String.format("%s.%s.", "system", (QueueConstants.QueueType) it.next());
            final TableId configTableId = getConfigTableId(str);
            this.tableUtil.deleteAllInNamespace(getHBaseAdmin(), Id.Namespace.from(str), new Predicate<TableId>() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueAdmin.3
                public boolean apply(TableId tableId) {
                    return tableId.getTableName().startsWith(format) && !tableId.equals(configTableId);
                }
            });
        }
        Id.DatasetInstance stateStoreId = getStateStoreId(str);
        if (this.datasetFramework.hasInstance(stateStoreId)) {
            this.datasetFramework.deleteInstance(stateStoreId);
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueAdmin
    public TableId getDataTableId(String str, String str2, String str3) {
        return getDataTableId(str, str2, str3, this.type);
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueAdmin
    public TableId getDataTableId(QueueName queueName) {
        return getDataTableId(queueName, this.type);
    }

    public TableId getDataTableId(QueueName queueName, QueueConstants.QueueType queueType) {
        if (queueName.isQueue()) {
            return getDataTableId(queueName.getFirstComponent(), queueName.getSecondComponent(), queueName.getThirdComponent(), queueType);
        }
        throw new IllegalArgumentException("'" + queueName + "' is not a valid name for a queue.");
    }

    public TableId getDataTableId(String str, String str2, String str3, QueueConstants.QueueType queueType) {
        return TableId.from(str, String.format("%s.%s.%s.%s", "system", queueType, str2, str3));
    }

    private void upgrade(TableId tableId, Properties properties) throws Exception {
        DatasetAdmin datasetAdmin = new DatasetAdmin(tableId, this.hConf, this.tableUtil, properties);
        try {
            datasetAdmin.upgrade();
            datasetAdmin.close();
        } catch (Throwable th) {
            datasetAdmin.close();
            throw th;
        }
    }

    private boolean isDataTable(TableId tableId) {
        String tableName = tableId.getTableName();
        if (tableName.split("\\.").length <= 3) {
            return false;
        }
        Iterator it = EnumSet.of(QueueConstants.QueueType.QUEUE, QueueConstants.QueueType.SHARDED_QUEUE).iterator();
        while (it.hasNext()) {
            if (tableName.startsWith("system." + ((QueueConstants.QueueType) it.next()).toString())) {
                return true;
            }
        }
        return false;
    }

    private boolean isStateStoreTable(TableId tableId) {
        return tableId.getTableName().equals(getConfigTableId("ns").getTableName());
    }
}
