package co.cask.cdap.messaging.store.hbase;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.hbase.wd.AbstractRowKeyDistributor;
import co.cask.cdap.hbase.wd.DistributedScanner;
import co.cask.cdap.messaging.store.AbstractPayloadTable;
import co.cask.cdap.messaging.store.RawPayloadTableEntry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/messaging/store/hbase/HBasePayloadTable.class */
public final class HBasePayloadTable extends AbstractPayloadTable {
    private static final byte[] COL = Bytes.toBytes(99);
    private final HBaseTableUtil tableUtil;
    private final byte[] columnFamily;
    private final HTable hTable;
    private final AbstractRowKeyDistributor rowKeyDistributor;
    private final ExecutorService scanExecutor;
    private final int scanCacheRows;
    private final HBaseExceptionHandler exceptionHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HBasePayloadTable(HBaseTableUtil hBaseTableUtil, HTable hTable, byte[] bArr, AbstractRowKeyDistributor abstractRowKeyDistributor, ExecutorService executorService, int i, HBaseExceptionHandler hBaseExceptionHandler) {
        this.tableUtil = hBaseTableUtil;
        this.hTable = hTable;
        this.columnFamily = Arrays.copyOf(bArr, bArr.length);
        this.rowKeyDistributor = abstractRowKeyDistributor;
        this.scanExecutor = executorService;
        this.scanCacheRows = i;
        this.exceptionHandler = hBaseExceptionHandler;
    }

    @Override // co.cask.cdap.messaging.store.AbstractPayloadTable
    public CloseableIterator<RawPayloadTableEntry> read(byte[] bArr, byte[] bArr2, final int i) throws IOException {
        final DistributedScanner create = DistributedScanner.create(this.hTable, this.tableUtil.buildScan().setStartRow(bArr).setStopRow(bArr2).setCaching(this.scanCacheRows).build(), this.rowKeyDistributor, this.scanExecutor);
        return new AbstractCloseableIterator<RawPayloadTableEntry>() { // from class: co.cask.cdap.messaging.store.hbase.HBasePayloadTable.1
            private final RawPayloadTableEntry tableEntry = new RawPayloadTableEntry();
            private boolean closed = false;
            private int maxLimit;

            {
                this.maxLimit = i;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public RawPayloadTableEntry m31computeNext() {
                if (this.closed || this.maxLimit <= 0) {
                    return (RawPayloadTableEntry) endOfData();
                }
                try {
                    Result next = create.next();
                    if (next == null) {
                        return (RawPayloadTableEntry) endOfData();
                    }
                    this.maxLimit--;
                    return this.tableEntry.set(HBasePayloadTable.this.rowKeyDistributor.getOriginalKey(next.getRow()), next.getValue(HBasePayloadTable.this.columnFamily, HBasePayloadTable.COL));
                } catch (IOException e) {
                    throw HBasePayloadTable.this.exceptionHandler.handleAndWrap(e);
                }
            }

            public void close() {
                try {
                    create.close();
                } finally {
                    endOfData();
                    this.closed = true;
                }
            }
        };
    }

    @Override // co.cask.cdap.messaging.store.AbstractPayloadTable
    public void persist(Iterator<RawPayloadTableEntry> it) throws IOException {
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            RawPayloadTableEntry next = it.next();
            arrayList.add(this.tableUtil.buildPut(this.rowKeyDistributor.getDistributedKey(next.getKey())).add(this.columnFamily, COL, next.getValue()).build());
        }
        try {
            if (!arrayList.isEmpty()) {
                this.hTable.put(arrayList);
                if (!this.hTable.isAutoFlush()) {
                    this.hTable.flushCommits();
                }
            }
        } catch (IOException e) {
            throw ((IOException) this.exceptionHandler.handle(e));
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.hTable.close();
    }
}
