package co.cask.cdap.messaging.store.hbase;

import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.PutBuilder;
import co.cask.cdap.hbase.wd.AbstractRowKeyDistributor;
import co.cask.cdap.hbase.wd.DistributedScanner;
import co.cask.cdap.messaging.MessagingUtils;
import co.cask.cdap.messaging.store.AbstractMessageTable;
import co.cask.cdap.messaging.store.RawMessageTableEntry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/messaging/store/hbase/HBaseMessageTable.class */
public final class HBaseMessageTable extends AbstractMessageTable {
    private static final byte[] PAYLOAD_COL = MessagingUtils.Constants.PAYLOAD_COL;
    private static final byte[] TX_COL = MessagingUtils.Constants.TX_COL;
    private final HBaseTableUtil tableUtil;
    private final byte[] columnFamily;
    private final HTable hTable;
    private final AbstractRowKeyDistributor rowKeyDistributor;
    private final ExecutorService scanExecutor;
    private final int scanCacheRows;
    private final HBaseExceptionHandler exceptionHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HBaseMessageTable(HBaseTableUtil hBaseTableUtil, HTable hTable, byte[] bArr, AbstractRowKeyDistributor abstractRowKeyDistributor, ExecutorService executorService, int i, HBaseExceptionHandler hBaseExceptionHandler) {
        this.tableUtil = hBaseTableUtil;
        this.hTable = hTable;
        this.columnFamily = Arrays.copyOf(bArr, bArr.length);
        this.rowKeyDistributor = abstractRowKeyDistributor;
        this.scanExecutor = executorService;
        this.scanCacheRows = i;
        this.exceptionHandler = hBaseExceptionHandler;
    }

    @Override // co.cask.cdap.messaging.store.AbstractMessageTable
    protected CloseableIterator<RawMessageTableEntry> read(byte[] bArr, byte[] bArr2) throws IOException {
        try {
            final DistributedScanner create = DistributedScanner.create(this.hTable, this.tableUtil.buildScan().setStartRow(bArr).setStopRow(bArr2).setCaching(this.scanCacheRows).build(), this.rowKeyDistributor, this.scanExecutor);
            final RawMessageTableEntry rawMessageTableEntry = new RawMessageTableEntry();
            return new AbstractCloseableIterator<RawMessageTableEntry>() { // from class: co.cask.cdap.messaging.store.hbase.HBaseMessageTable.1
                private boolean closed = false;

                /* JADX INFO: Access modifiers changed from: protected */
                /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
                public RawMessageTableEntry m22computeNext() {
                    if (this.closed) {
                        return (RawMessageTableEntry) endOfData();
                    }
                    try {
                        Result next = create.next();
                        return next == null ? (RawMessageTableEntry) endOfData() : rawMessageTableEntry.set(HBaseMessageTable.this.rowKeyDistributor.getOriginalKey(next.getRow()), next.getValue(HBaseMessageTable.this.columnFamily, HBaseMessageTable.TX_COL), next.getValue(HBaseMessageTable.this.columnFamily, HBaseMessageTable.PAYLOAD_COL));
                    } catch (IOException e) {
                        throw HBaseMessageTable.this.exceptionHandler.handleAndWrap(e);
                    }
                }

                public void close() {
                    try {
                        create.close();
                        endOfData();
                        this.closed = true;
                    } catch (Throwable th) {
                        endOfData();
                        this.closed = true;
                        throw th;
                    }
                }
            };
        } catch (IOException e) {
            throw ((IOException) this.exceptionHandler.handle(e));
        }
    }

    @Override // co.cask.cdap.messaging.store.AbstractMessageTable
    protected void persist(Iterator<RawMessageTableEntry> it) throws IOException {
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            RawMessageTableEntry next = it.next();
            PutBuilder buildPut = this.tableUtil.buildPut(this.rowKeyDistributor.getDistributedKey(next.getKey()));
            if (next.getTxPtr() != null) {
                buildPut.add(this.columnFamily, TX_COL, next.getTxPtr());
            }
            if (next.getPayload() != null) {
                buildPut.add(this.columnFamily, PAYLOAD_COL, next.getPayload());
            }
            arrayList.add(buildPut.build());
        }
        try {
            if (!arrayList.isEmpty()) {
                this.hTable.put(arrayList);
                if (!this.hTable.isAutoFlush()) {
                    this.hTable.flushCommits();
                }
            }
        } catch (IOException e) {
            throw ((IOException) this.exceptionHandler.handle(e));
        }
    }

    @Override // co.cask.cdap.messaging.store.AbstractMessageTable
    public void rollback(byte[] bArr, byte[] bArr2, byte[] bArr3) throws IOException {
        Scan build = this.tableUtil.buildScan().setStartRow(bArr).setStopRow(bArr2).setCaching(this.scanCacheRows).build();
        ArrayList arrayList = new ArrayList();
        DistributedScanner create = DistributedScanner.create(this.hTable, build, this.rowKeyDistributor, this.scanExecutor);
        Throwable th = null;
        try {
            try {
                Iterator it = create.iterator();
                while (it.hasNext()) {
                    PutBuilder buildPut = this.tableUtil.buildPut(((Result) it.next()).getRow());
                    buildPut.add(this.columnFamily, TX_COL, bArr3);
                    arrayList.add(buildPut.build());
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                try {
                    if (!arrayList.isEmpty()) {
                        this.hTable.put(arrayList);
                        if (!this.hTable.isAutoFlush()) {
                            this.hTable.flushCommits();
                        }
                    }
                } catch (IOException e) {
                    throw ((IOException) this.exceptionHandler.handle(e));
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.hTable.close();
    }
}
