package co.cask.cdap.data2.transaction.stream.hbase;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data.stream.StreamEventOffset;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.data2.transaction.stream.StreamConsumerState;
import co.cask.cdap.data2.transaction.stream.StreamConsumerStateStore;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.hbase.wd.AbstractRowKeyDistributor;
import co.cask.cdap.hbase.wd.DistributedScanner;
import co.cask.cdap.proto.id.StreamId;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Threads;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/hbase/HBaseStreamFileConsumer.class */
public final class HBaseStreamFileConsumer extends AbstractStreamFileConsumer {
    private final HBaseTableUtil tableUtil;
    private final HTable hTable;
    private final AbstractRowKeyDistributor keyDistributor;
    private final ExecutorService scanExecutor;

    public HBaseStreamFileConsumer(CConfiguration cConfiguration, StreamConfig streamConfig, ConsumerConfig consumerConfig, HBaseTableUtil hBaseTableUtil, HTable hTable, FileReader<StreamEventOffset, Iterable<StreamFileOffset>> fileReader, StreamConsumerStateStore streamConsumerStateStore, StreamConsumerState streamConsumerState, @Nullable ReadFilter readFilter, AbstractRowKeyDistributor abstractRowKeyDistributor) {
        super(cConfiguration, streamConfig, consumerConfig, fileReader, streamConsumerStateStore, streamConsumerState, readFilter);
        this.tableUtil = hBaseTableUtil;
        this.hTable = hTable;
        this.keyDistributor = abstractRowKeyDistributor;
        this.scanExecutor = createScanExecutor(streamConfig.getStreamId());
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected void doClose() throws IOException {
        this.scanExecutor.shutdownNow();
        this.hTable.close();
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected boolean claimFifoEntry(byte[] bArr, byte[] bArr2, byte[] bArr3) throws IOException {
        Put put = new Put(this.keyDistributor.getDistributedKey(bArr));
        put.add(QueueEntryRow.COLUMN_FAMILY, this.stateColumnName, bArr2);
        return this.hTable.checkAndPut(put.getRow(), QueueEntryRow.COLUMN_FAMILY, this.stateColumnName, bArr3, put);
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected void updateState(Iterable<byte[]> iterable, int i, byte[] bArr) throws IOException {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        Iterator<byte[]> it = iterable.iterator();
        while (it.hasNext()) {
            Put put = new Put(this.keyDistributor.getDistributedKey(it.next()));
            put.add(QueueEntryRow.COLUMN_FAMILY, this.stateColumnName, bArr);
            newArrayListWithCapacity.add(put);
        }
        this.hTable.put(newArrayListWithCapacity);
        this.hTable.flushCommits();
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected void undoState(Iterable<byte[]> iterable, int i) throws IOException {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        Iterator<byte[]> it = iterable.iterator();
        while (it.hasNext()) {
            Delete delete = new Delete(this.keyDistributor.getDistributedKey(it.next()));
            delete.deleteColumns(QueueEntryRow.COLUMN_FAMILY, this.stateColumnName);
            newArrayListWithCapacity.add(delete);
        }
        this.hTable.delete(newArrayListWithCapacity);
        this.hTable.flushCommits();
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected AbstractStreamFileConsumer.StateScanner scanStates(byte[] bArr, byte[] bArr2) throws IOException {
        final DistributedScanner create = DistributedScanner.create(this.hTable, this.tableUtil.buildScan().setStartRow(bArr).setStopRow(bArr2).setMaxVersions(1).addColumn(QueueEntryRow.COLUMN_FAMILY, this.stateColumnName).setCaching(1000).build(), this.keyDistributor, this.scanExecutor);
        return new AbstractStreamFileConsumer.StateScanner() { // from class: co.cask.cdap.data2.transaction.stream.hbase.HBaseStreamFileConsumer.1
            private Result result;

            @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.StateScanner
            public boolean nextStateRow() throws IOException {
                this.result = create.next();
                return this.result != null;
            }

            @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.StateScanner
            public byte[] getRow() {
                return HBaseStreamFileConsumer.this.keyDistributor.getOriginalKey(this.result.getRow());
            }

            @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.StateScanner
            public byte[] getState() {
                return this.result.value();
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                create.close();
            }
        };
    }

    private ExecutorService createScanExecutor(StreamId streamId) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Threads.newDaemonThreadFactory(String.format("stream-%s-%s-consumer-scanner-", streamId.getNamespace(), streamId.getEntityName())));
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }
}
