package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data.Namespace;
import co.cask.cdap.data2.datafabric.DefaultDatasetNamespace;
import co.cask.cdap.data2.dataset2.lib.hbase.AbstractHBaseDataSetAdmin;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.hbase.wd.AbstractRowKeyDistributor;
import co.cask.cdap.hbase.wd.RowKeyDistributorByHashPrefix;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueAdmin.class */
public class HBaseQueueAdmin implements QueueAdmin {
    public static final int SALT_BYTES = 1;
    public static final int ROW_KEY_DISTRIBUTION_BUCKETS = 8;
    protected final HBaseTableUtil tableUtil;
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final LocationFactory locationFactory;
    private final String tableNamePrefix;
    private final String configTableName;
    private final QueueConstants.QueueType type;
    private HBaseAdmin admin;
    private static final Logger LOG = LoggerFactory.getLogger(HBaseQueueAdmin.class);
    public static final AbstractRowKeyDistributor ROW_KEY_DISTRIBUTOR = new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(8));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueAdmin$DatasetAdmin.class */
    public final class DatasetAdmin extends AbstractHBaseDataSetAdmin {
        private DatasetAdmin(String str, Configuration configuration, HBaseTableUtil hBaseTableUtil) {
            super(str, configuration, hBaseTableUtil);
        }

        @Override // co.cask.cdap.data2.dataset2.lib.hbase.AbstractHBaseDataSetAdmin
        protected AbstractHBaseDataSetAdmin.CoprocessorJar createCoprocessorJar() throws IOException {
            List<? extends Class<? extends Coprocessor>> coprocessors = HBaseQueueAdmin.this.getCoprocessors();
            if (coprocessors.isEmpty()) {
                return AbstractHBaseDataSetAdmin.CoprocessorJar.EMPTY;
            }
            return new AbstractHBaseDataSetAdmin.CoprocessorJar(coprocessors, HBaseTableUtil.createCoProcessorJar(HBaseQueueAdmin.this.type.name().toLowerCase(), HBaseQueueAdmin.this.locationFactory.create(HBaseQueueAdmin.this.cConf.get(QueueConstants.ConfigKeys.QUEUE_TABLE_COPROCESSOR_DIR, QueueConstants.DEFAULT_QUEUE_TABLE_COPROCESSOR_DIR)), coprocessors));
        }

        @Override // co.cask.cdap.data2.dataset2.lib.hbase.AbstractHBaseDataSetAdmin
        protected boolean upgradeTable(HTableDescriptor hTableDescriptor) {
            HColumnDescriptor family = hTableDescriptor.getFamily(QueueEntryRow.COLUMN_FAMILY);
            if (family.getMaxVersions() == 1) {
                return false;
            }
            family.setMaxVersions(1);
            return true;
        }

        public void create() throws IOException {
            byte[] bytes = Bytes.toBytes(this.tableName);
            HTableDescriptor hTableDescriptor = new HTableDescriptor(bytes);
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(QueueEntryRow.COLUMN_FAMILY);
            hTableDescriptor.addFamily(hColumnDescriptor);
            hColumnDescriptor.setMaxVersions(1);
            AbstractHBaseDataSetAdmin.CoprocessorJar createCoprocessorJar = createCoprocessorJar();
            for (Class<? extends Coprocessor> cls : createCoprocessorJar.getCoprocessors()) {
                addCoprocessor(hTableDescriptor, cls, createCoprocessorJar.getJarLocation(), createCoprocessorJar.getPriority(cls));
            }
            this.tableUtil.createTableIfNotExists(HBaseQueueAdmin.this.getHBaseAdmin(), bytes, hTableDescriptor, HBaseTableUtil.getSplitKeys(HBaseQueueAdmin.this.cConf.getInt(QueueConstants.ConfigKeys.QUEUE_TABLE_PRESPLITS, 16)));
        }
    }

    @Inject
    public HBaseQueueAdmin(Configuration configuration, CConfiguration cConfiguration, LocationFactory locationFactory, HBaseTableUtil hBaseTableUtil) throws IOException {
        this(configuration, cConfiguration, QueueConstants.QueueType.QUEUE, locationFactory, hBaseTableUtil);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HBaseQueueAdmin(Configuration configuration, CConfiguration cConfiguration, QueueConstants.QueueType queueType, LocationFactory locationFactory, HBaseTableUtil hBaseTableUtil) throws IOException {
        this.hConf = configuration;
        this.cConf = cConfiguration;
        this.tableUtil = hBaseTableUtil;
        String str = queueType == QueueConstants.QueueType.QUEUE ? QueueConstants.QUEUE_TABLE_PREFIX : QueueConstants.STREAM_TABLE_PREFIX;
        this.type = queueType;
        DefaultDatasetNamespace defaultDatasetNamespace = new DefaultDatasetNamespace(cConfiguration, Namespace.SYSTEM);
        this.tableNamePrefix = HBaseTableUtil.getHBaseTableName(defaultDatasetNamespace.namespace(str));
        this.configTableName = HBaseTableUtil.getHBaseTableName(defaultDatasetNamespace.namespace(QueueConstants.QUEUE_CONFIG_TABLE_NAME));
        this.locationFactory = locationFactory;
    }

    protected final synchronized HBaseAdmin getHBaseAdmin() throws IOException {
        if (this.admin == null) {
            this.admin = new HBaseAdmin(this.hConf);
        }
        return this.admin;
    }

    public String getActualTableName(QueueName queueName) {
        if (queueName.isQueue()) {
            return getTableNameForFlow(queueName.getFirstComponent(), queueName.getSecondComponent());
        }
        throw new IllegalArgumentException("'" + queueName + "' is not a valid name for a queue.");
    }

    private String getTableNameForFlow(String str, String str2) {
        return this.tableNamePrefix + "." + str + "." + str2;
    }

    public boolean doDropTable(QueueName queueName) {
        return false;
    }

    public boolean doTruncateTable(QueueName queueName) {
        return true;
    }

    public static byte[] getConsumerStateColumn(long j, int i) {
        byte[] bArr = new byte[12];
        Bytes.putLong(bArr, 0, j);
        Bytes.putInt(bArr, 8, i);
        return bArr;
    }

    public static String getApplicationName(String str) {
        String[] split = str.split("\\.");
        return split[split.length - 2];
    }

    public static String getFlowName(String str) {
        String[] split = str.split("\\.");
        return split[split.length - 1];
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public boolean exists(String str) throws Exception {
        return exists(QueueName.from(URI.create(str)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean exists(QueueName queueName) throws IOException {
        HBaseAdmin hBaseAdmin = getHBaseAdmin();
        return hBaseAdmin.tableExists(getActualTableName(queueName)) && hBaseAdmin.tableExists(this.configTableName);
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void create(String str, Properties properties) throws Exception {
        create(str);
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void create(String str) throws IOException {
        create(QueueName.from(URI.create(str)));
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void truncate(String str) throws Exception {
        QueueName from = QueueName.from(URI.create(str));
        if (doTruncateTable(from)) {
            truncate(Bytes.toBytes(getActualTableName(from)));
        } else {
            LOG.warn("truncate({}) on HBase queue table has no effect.", str);
        }
        deleteConsumerConfigurations(from);
    }

    private void truncate(byte[] bArr) throws IOException {
        HBaseAdmin hBaseAdmin = getHBaseAdmin();
        if (hBaseAdmin.tableExists(bArr)) {
            HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(bArr);
            hBaseAdmin.disableTable(bArr);
            hBaseAdmin.deleteTable(bArr);
            hBaseAdmin.createTable(tableDescriptor);
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void clearAllForFlow(String str, String str2) throws Exception {
        truncate(Bytes.toBytes(getTableNameForFlow(str, str2)));
        deleteConsumerConfigurations(str, str2);
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void dropAllForFlow(String str, String str2) throws Exception {
        drop(Bytes.toBytes(getTableNameForFlow(str, str2)));
        deleteConsumerConfigurations(str, str2);
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void drop(String str) throws Exception {
        QueueName from = QueueName.from(URI.create(str));
        if (doDropTable(from)) {
            drop(Bytes.toBytes(getActualTableName(from)));
        } else {
            LOG.warn("drop({}) on HBase queue table has no effect.", str);
        }
        deleteConsumerConfigurations(from);
    }

    @Override // co.cask.cdap.data2.transaction.EntityAdmin
    public void upgrade(String str, Properties properties) throws Exception {
        DatasetAdmin datasetAdmin = new DatasetAdmin(str, this.hConf, this.tableUtil);
        try {
            datasetAdmin.upgrade();
            datasetAdmin.close();
        } catch (Throwable th) {
            datasetAdmin.close();
            throw th;
        }
    }

    private void drop(byte[] bArr) throws IOException {
        HBaseAdmin hBaseAdmin = getHBaseAdmin();
        if (hBaseAdmin.tableExists(bArr)) {
            hBaseAdmin.disableTable(bArr);
            hBaseAdmin.deleteTable(bArr);
        }
    }

    private void deleteConsumerConfigurations(QueueName queueName) throws IOException {
        HTable hTable = new HTable(getHBaseAdmin().getConfiguration(), this.configTableName);
        try {
            hTable.delete(new Delete(queueName.toBytes()));
            hTable.close();
        } catch (Throwable th) {
            hTable.close();
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    private void deleteConsumerConfigurations(String str, String str2) throws IOException {
        HBaseAdmin hBaseAdmin = getHBaseAdmin();
        if (!hBaseAdmin.tableExists(this.configTableName)) {
            return;
        }
        HTable hTable = new HTable(hBaseAdmin.getConfiguration(), this.configTableName);
        try {
            byte[] bytes = Bytes.toBytes(QueueName.prefixForFlow(str, str2));
            byte[] copyOf = Arrays.copyOf(bytes, bytes.length);
            int length = bytes.length - 1;
            copyOf[length] = (byte) (copyOf[length] + 1);
            Scan scan = new Scan();
            scan.setStartRow(bytes);
            scan.setStopRow(copyOf);
            scan.setFilter(new FirstKeyOnlyFilter());
            scan.setMaxVersions(1);
            ResultScanner scanner = hTable.getScanner(scan);
            ArrayList newArrayList = Lists.newArrayList();
            while (true) {
                try {
                    Result next = scanner.next();
                    if (next == null) {
                        scanner.close();
                        hTable.delete(newArrayList);
                        hTable.close();
                        return;
                    }
                    newArrayList.add(new Delete(next.getRow()));
                } catch (Throwable th) {
                    scanner.close();
                    throw th;
                }
            }
        } catch (Throwable th2) {
            hTable.close();
            throw th2;
        }
    }

    public void create(QueueName queueName) throws IOException {
        createConfigTable();
        DatasetAdmin datasetAdmin = new DatasetAdmin(getActualTableName(queueName), this.hConf, this.tableUtil);
        try {
            datasetAdmin.create();
            datasetAdmin.close();
        } catch (Throwable th) {
            datasetAdmin.close();
            throw th;
        }
    }

    private void createConfigTable() throws IOException {
        byte[] bytes = Bytes.toBytes(this.configTableName);
        HTableDescriptor hTableDescriptor = new HTableDescriptor(bytes);
        HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(QueueEntryRow.COLUMN_FAMILY);
        hTableDescriptor.addFamily(hColumnDescriptor);
        hColumnDescriptor.setMaxVersions(1);
        this.tableUtil.createTableIfNotExists(getHBaseAdmin(), bytes, hTableDescriptor, (byte[][]) null, 5000L, TimeUnit.MILLISECONDS);
    }

    protected List<? extends Class<? extends Coprocessor>> getCoprocessors() {
        return ImmutableList.of(this.tableUtil.getQueueRegionObserverClassForVersion(), this.tableUtil.getDequeueScanObserverClassForVersion());
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void dropAll() throws Exception {
        for (HTableDescriptor hTableDescriptor : getHBaseAdmin().listTables()) {
            String bytes = Bytes.toString(hTableDescriptor.getName());
            if (bytes.startsWith(this.tableNamePrefix) && !bytes.equals(this.configTableName)) {
                drop(hTableDescriptor.getName());
            }
        }
        drop(Bytes.toBytes(this.configTableName));
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void configureInstances(QueueName queueName, long j, int i) throws Exception {
        Preconditions.checkArgument(i > 0, "Number of consumer instances must be > 0.");
        if (!exists(queueName)) {
            create(queueName);
        }
        HTable hTable = new HTable(getHBaseAdmin().getConfiguration(), this.configTableName);
        try {
            byte[] bytes = queueName.toBytes();
            Get get = new Get(bytes);
            get.addFamily(QueueEntryRow.COLUMN_FAMILY);
            get.setFilter(new ColumnPrefixFilter(Bytes.toBytes(j)));
            List<HBaseConsumerState> create = HBaseConsumerState.create(hTable.get(get));
            if (create.size() == i) {
                return;
            }
            hTable.batch(getConfigMutations(j, i, bytes, create, new ArrayList()));
            hTable.close();
        } finally {
            hTable.close();
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void configureGroups(QueueName queueName, Map<Long, Integer> map) throws Exception {
        Preconditions.checkArgument(!map.isEmpty(), "Consumer group information must not be empty.");
        if (!exists(queueName)) {
            create(queueName);
        }
        HTable hTable = new HTable(getHBaseAdmin().getConfiguration(), this.configTableName);
        try {
            byte[] bytes = queueName.toBytes();
            ImmutableSortedMap familyMap = hTable.get(new Get(bytes)).getFamilyMap(QueueEntryRow.COLUMN_FAMILY);
            if (familyMap == null) {
                familyMap = ImmutableSortedMap.of();
            }
            HashMap newHashMap = Maps.newHashMap();
            byte[] decodeGroupInfo = decodeGroupInfo(map, familyMap, newHashMap);
            List<Mutation> newArrayList = Lists.newArrayList();
            Sets.SetView difference = Sets.difference(newHashMap.keySet(), map.keySet());
            if (!difference.isEmpty()) {
                Delete delete = new Delete(bytes);
                Iterator it = difference.iterator();
                while (it.hasNext()) {
                    long longValue = ((Long) it.next()).longValue();
                    for (int i = 0; i < newHashMap.get(Long.valueOf(longValue)).intValue(); i++) {
                        delete.deleteColumns(QueueEntryRow.COLUMN_FAMILY, getConsumerStateColumn(longValue, i));
                    }
                }
                if (!delete.isEmpty()) {
                    newArrayList.add(delete);
                }
            }
            Put put = new Put(bytes);
            for (Map.Entry<Long, Integer> entry : map.entrySet()) {
                long longValue2 = entry.getKey().longValue();
                int intValue = entry.getValue().intValue();
                if (!newHashMap.containsKey(Long.valueOf(longValue2))) {
                    for (int i2 = 0; i2 < intValue; i2++) {
                        put.add(QueueEntryRow.COLUMN_FAMILY, getConsumerStateColumn(longValue2, i2), decodeGroupInfo == null ? Bytes.EMPTY_BYTE_ARRAY : decodeGroupInfo);
                    }
                } else if (newHashMap.get(Long.valueOf(longValue2)).intValue() != intValue) {
                    newArrayList = getConfigMutations(longValue2, intValue, bytes, HBaseConsumerState.create((SortedMap<byte[], byte[]>) familyMap.subMap(getConsumerStateColumn(longValue2, 0), getConsumerStateColumn(longValue2, newHashMap.get(Long.valueOf(longValue2)).intValue()))), newArrayList);
                }
            }
            if (!put.isEmpty()) {
                newArrayList.add(put);
            }
            if (!newArrayList.isEmpty()) {
                hTable.batch(newArrayList);
            }
        } finally {
            hTable.close();
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueAdmin
    public void upgrade() throws Exception {
        Properties properties = new Properties();
        for (HTableDescriptor hTableDescriptor : getHBaseAdmin().listTables()) {
            String bytes = Bytes.toString(hTableDescriptor.getName());
            if (bytes.startsWith(this.tableNamePrefix) && !bytes.equals(this.configTableName)) {
                LOG.info(String.format("Upgrading queue hbase table: %s", bytes));
                upgrade(bytes, properties);
                LOG.info(String.format("Upgraded queue hbase table: %s", bytes));
            }
        }
    }

    private byte[] decodeGroupInfo(Map<Long, Integer> map, Map<byte[], byte[]> map2, Map<Long, Integer> map3) {
        byte[] bArr = null;
        for (Map.Entry<byte[], byte[]> entry : map2.entrySet()) {
            long j = Bytes.toLong(entry.getKey());
            map3.put(Long.valueOf(j), Integer.valueOf(Bytes.toInt(entry.getKey(), 8) + 1));
            if (map.containsKey(Long.valueOf(j)) && (bArr == null || Bytes.BYTES_COMPARATOR.compare(entry.getValue(), bArr) < 0)) {
                bArr = entry.getValue();
            }
        }
        return bArr;
    }

    private List<Mutation> getConfigMutations(long j, int i, byte[] bArr, List<HBaseConsumerState> list, List<Mutation> list2) {
        byte[] bArr2 = null;
        for (HBaseConsumerState hBaseConsumerState : list) {
            if (bArr2 == null || Bytes.BYTES_COMPARATOR.compare(hBaseConsumerState.getStartRow(), bArr2) < 0) {
                bArr2 = hBaseConsumerState.getStartRow();
            }
        }
        Preconditions.checkArgument(bArr2 != null, "No startRow found for consumer group %s", new Object[]{Long.valueOf(j)});
        int size = list.size();
        Put put = new Put(bArr);
        Delete delete = new Delete(bArr);
        for (HBaseConsumerState hBaseConsumerState2 : list) {
            HBaseConsumerState hBaseConsumerState3 = new HBaseConsumerState(bArr2, hBaseConsumerState2.getGroupId(), hBaseConsumerState2.getInstanceId());
            if (hBaseConsumerState2.getInstanceId() < i) {
                hBaseConsumerState3.updatePut(put);
            } else {
                hBaseConsumerState3.delete(delete);
            }
        }
        for (int i2 = size; i2 < i; i2++) {
            new HBaseConsumerState(bArr2, j, i2).updatePut(put);
        }
        if (!put.isEmpty()) {
            list2.add(put);
        }
        if (!delete.isEmpty()) {
            list2.add(delete);
        }
        return list2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getTableNamePrefix() {
        return this.tableNamePrefix;
    }

    public String getConfigTableName() {
        return this.configTableName;
    }
}
