package co.cask.cdap.data2.dataset2.lib.table.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.DataSetException;
import co.cask.cdap.api.dataset.DatasetContext;
import co.cask.cdap.api.dataset.DatasetSpecification;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.api.dataset.table.TableProperties;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.common.utils.ImmutablePair;
import co.cask.cdap.data2.dataset2.lib.table.FuzzyRowFilter;
import co.cask.cdap.data2.dataset2.lib.table.MetricsTable;
import co.cask.cdap.data2.util.TableId;
import co.cask.cdap.data2.util.hbase.DeleteBuilder;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.PutBuilder;
import co.cask.cdap.data2.util.hbase.ScanBuilder;
import co.cask.cdap.hbase.wd.AbstractRowKeyDistributor;
import co.cask.cdap.hbase.wd.DistributedScanner;
import co.cask.cdap.hbase.wd.RowKeyDistributorByHashPrefix;
import co.cask.cdap.proto.id.NamespaceId;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/dataset2/lib/table/hbase/HBaseMetricsTable.class */
public class HBaseMetricsTable implements MetricsTable {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseMetricsTable.class);
    private static final Logger REJECTION_LOG = Loggers.sampling(LOG, LogSamplers.exponentialLimit(1, 1024, 2.0d));
    private final HBaseTableUtil tableUtil;
    private final TableId tableId;
    private final HTable hTable;
    private final byte[] columnFamily;
    private AbstractRowKeyDistributor rowKeyDistributor;
    private ExecutorService scanExecutor;

    public HBaseMetricsTable(DatasetContext datasetContext, DatasetSpecification datasetSpecification, Configuration configuration, HBaseTableUtil hBaseTableUtil, CConfiguration cConfiguration) throws IOException {
        this.tableUtil = hBaseTableUtil;
        this.tableId = hBaseTableUtil.createHTableId(new NamespaceId(datasetContext.getNamespaceId()), datasetSpecification.getName());
        initializeV3Vars(cConfiguration, datasetSpecification);
        HTable createHTable = hBaseTableUtil.createHTable(configuration, this.tableId);
        createHTable.setWriteBufferSize(4194304L);
        createHTable.setAutoFlushTo(false);
        this.hTable = createHTable;
        this.columnFamily = TableProperties.getColumnFamilyBytes(datasetSpecification.getProperties());
    }

    private void initializeV3Vars(CConfiguration cConfiguration, DatasetSpecification datasetSpecification) {
        boolean contains = datasetSpecification.getName().contains("v3");
        this.scanExecutor = null;
        this.rowKeyDistributor = null;
        if (contains) {
            this.scanExecutor = new ThreadPoolExecutor(0, cConfiguration.getInt("metrics.hbase.max.scan.threads"), 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.createDaemonThreadFactory("metrics-hbase-scanner-%d"), new RejectedExecutionHandler() { // from class: co.cask.cdap.data2.dataset2.lib.table.hbase.HBaseMetricsTable.1
                @Override // java.util.concurrent.RejectedExecutionHandler
                public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                    HBaseMetricsTable.REJECTION_LOG.info("No more threads in the HBase scan thread pool. Consider increase {}. Performing scan in caller thread {}", "metrics.hbase.max.scan.threads", Thread.currentThread().getName());
                    if (threadPoolExecutor.isShutdown()) {
                        return;
                    }
                    runnable.run();
                }
            });
            this.rowKeyDistributor = new RowKeyDistributorByHashPrefix(new RowKeyDistributorByHashPrefix.OneByteSimpleHash(datasetSpecification.getIntProperty("metrics.table.splits", 16)));
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    @Nullable
    public byte[] get(byte[] bArr, byte[] bArr2) {
        try {
            Result result = this.hTable.get(this.tableUtil.buildGet(createDistributedRowKey(bArr)).addColumn(this.columnFamily, bArr2).setMaxVersions(1).build());
            if (result.isEmpty()) {
                return null;
            }
            return result.getValue(this.columnFamily, bArr2);
        } catch (IOException e) {
            throw new DataSetException("Get failed on table " + this.tableId, e);
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    public void put(SortedMap<byte[], ? extends SortedMap<byte[], Long>> sortedMap) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<byte[], ? extends SortedMap<byte[], Long>> entry : sortedMap.entrySet()) {
            PutBuilder buildPut = this.tableUtil.buildPut(createDistributedRowKey(entry.getKey()));
            for (Map.Entry<byte[], Long> entry2 : entry.getValue().entrySet()) {
                buildPut.add(this.columnFamily, entry2.getKey(), Bytes.toBytes(entry2.getValue().longValue()));
            }
            newArrayList.add(buildPut.build());
        }
        try {
            this.hTable.put(newArrayList);
            this.hTable.flushCommits();
        } catch (IOException e) {
            throw new DataSetException("Put failed on table " + this.tableId, e);
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    public void putBytes(SortedMap<byte[], ? extends SortedMap<byte[], byte[]>> sortedMap) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<byte[], ? extends SortedMap<byte[], byte[]>> entry : sortedMap.entrySet()) {
            PutBuilder buildPut = this.tableUtil.buildPut(createDistributedRowKey(entry.getKey()));
            for (Map.Entry<byte[], byte[]> entry2 : entry.getValue().entrySet()) {
                buildPut.add(this.columnFamily, entry2.getKey(), entry2.getValue());
            }
            newArrayList.add(buildPut.build());
        }
        try {
            this.hTable.put(newArrayList);
            this.hTable.flushCommits();
        } catch (IOException e) {
            throw new DataSetException("Put failed on table " + this.tableId, e);
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    public boolean swap(byte[] bArr, byte[] bArr2, byte[] bArr3, byte[] bArr4) {
        try {
            byte[] createDistributedRowKey = createDistributedRowKey(bArr);
            if (bArr4 == null) {
                return this.hTable.checkAndDelete(createDistributedRowKey, this.columnFamily, bArr2, bArr3, this.tableUtil.buildDelete(createDistributedRowKey).deleteColumns(this.columnFamily, bArr2).build());
            }
            return this.hTable.checkAndPut(createDistributedRowKey, this.columnFamily, bArr2, bArr3, this.tableUtil.buildPut(createDistributedRowKey).add(this.columnFamily, bArr2, bArr4).build());
        } catch (IOException e) {
            throw new DataSetException("Swap failed on table " + this.tableId, e);
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    public void increment(byte[] bArr, Map<byte[], Long> map) {
        byte[] createDistributedRowKey = createDistributedRowKey(bArr);
        try {
            this.hTable.put(getIncrementalPut(createDistributedRowKey, map));
            this.hTable.flushCommits();
        } catch (IOException e) {
            if (e.getMessage() != null && e.getMessage().contains("isn't 64 bits wide")) {
                throw new NumberFormatException("Attempted to increment a value that is not convertible to long, row: " + Bytes.toStringBinary(createDistributedRowKey));
            }
            throw new DataSetException("Increment failed on table " + this.tableId, e);
        }
    }

    private byte[] createDistributedRowKey(byte[] bArr) {
        return this.rowKeyDistributor == null ? bArr : this.rowKeyDistributor.getDistributedKey(bArr);
    }

    private Put getIncrementalPut(byte[] bArr, Map<byte[], Long> map) {
        Put incrementalPut = getIncrementalPut(bArr);
        for (Map.Entry<byte[], Long> entry : map.entrySet()) {
            incrementalPut.add(this.columnFamily, entry.getKey(), Bytes.toBytes(entry.getValue().longValue()));
        }
        return incrementalPut;
    }

    private Put getIncrementalPut(byte[] bArr) {
        return this.tableUtil.buildPut(bArr).setAttribute(HBaseTable.DELTA_WRITE, Bytes.toBytes(true)).build();
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    public void increment(NavigableMap<byte[], NavigableMap<byte[], Long>> navigableMap) {
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry<byte[], NavigableMap<byte[], Long>> entry : navigableMap.entrySet()) {
            newArrayList.add(getIncrementalPut(createDistributedRowKey(entry.getKey()), entry.getValue()));
        }
        try {
            this.hTable.put(newArrayList);
            this.hTable.flushCommits();
        } catch (IOException e) {
            if (e.getMessage() != null && e.getMessage().contains("isn't 64 bits wide")) {
                throw new NumberFormatException("Attempted to increment a value that is not convertible to long.");
            }
            throw new DataSetException("Increment failed on table " + this.tableId, e);
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    public long incrementAndGet(byte[] bArr, byte[] bArr2, long j) {
        byte[] createDistributedRowKey = createDistributedRowKey(bArr);
        Increment increment = new Increment(createDistributedRowKey);
        increment.addColumn(this.columnFamily, bArr2, j);
        try {
            return Bytes.toLong(this.hTable.increment(increment).getValue(this.columnFamily, bArr2));
        } catch (IOException e) {
            if (e.getMessage() == null || !e.getMessage().contains("isn't 64 bits wide")) {
                throw new DataSetException("IncrementAndGet failed on table " + this.tableId, e);
            }
            throw new NumberFormatException("Attempted to increment a value that is not convertible to long, row: " + Bytes.toStringBinary(createDistributedRowKey) + " column: " + Bytes.toStringBinary(bArr2));
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    public void delete(byte[] bArr, byte[][] bArr2) {
        DeleteBuilder buildDelete = this.tableUtil.buildDelete(createDistributedRowKey(bArr));
        for (byte[] bArr3 : bArr2) {
            buildDelete.deleteColumns(this.columnFamily, bArr3);
        }
        try {
            this.hTable.delete(buildDelete.build());
        } catch (IOException e) {
            throw new DataSetException("Delete failed on table " + this.tableId, e);
        }
    }

    @Override // co.cask.cdap.data2.dataset2.lib.table.MetricsTable
    public Scanner scan(@Nullable byte[] bArr, @Nullable byte[] bArr2, @Nullable FuzzyRowFilter fuzzyRowFilter) {
        ScanBuilder buildScan = this.tableUtil.buildScan();
        configureRangeScan(buildScan, bArr, bArr2, fuzzyRowFilter);
        try {
            return new HBaseScanner(getScanner(buildScan), this.columnFamily, this.rowKeyDistributor);
        } catch (IOException e) {
            throw new DataSetException("Scan failed on table " + this.tableId, e);
        }
    }

    private ResultScanner getScanner(ScanBuilder scanBuilder) throws IOException {
        return this.rowKeyDistributor == null ? this.hTable.getScanner(scanBuilder.build()) : DistributedScanner.create(this.hTable, scanBuilder.build(), this.rowKeyDistributor, this.scanExecutor);
    }

    private ScanBuilder configureRangeScan(ScanBuilder scanBuilder, @Nullable byte[] bArr, @Nullable byte[] bArr2, @Nullable FuzzyRowFilter fuzzyRowFilter) {
        scanBuilder.setCaching(1000);
        if (bArr != null) {
            scanBuilder.setStartRow(bArr);
        }
        if (bArr2 != null) {
            scanBuilder.setStopRow(bArr2);
        }
        scanBuilder.addFamily(this.columnFamily);
        if (fuzzyRowFilter != null) {
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(fuzzyRowFilter.getFuzzyKeysData().size());
            for (ImmutablePair<byte[], byte[]> immutablePair : fuzzyRowFilter.getFuzzyKeysData()) {
                if (this.rowKeyDistributor != null) {
                    newArrayListWithExpectedSize.addAll(this.rowKeyDistributor.getDistributedFilterPairs(immutablePair));
                } else {
                    newArrayListWithExpectedSize.add(Pair.newPair(Arrays.copyOf((byte[]) immutablePair.getFirst(), ((byte[]) immutablePair.getFirst()).length), Arrays.copyOf((byte[]) immutablePair.getSecond(), ((byte[]) immutablePair.getSecond()).length)));
                }
            }
            scanBuilder.setFilter(new org.apache.hadoop.hbase.filter.FuzzyRowFilter(newArrayListWithExpectedSize));
        }
        return scanBuilder;
    }

    public void close() throws IOException {
        this.hTable.close();
    }
}
