package co.cask.cdap.data2.transaction.queue.hbase;

import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.ConsumerGroupConfig;
import co.cask.cdap.data2.queue.QueueEntry;
import co.cask.cdap.data2.transaction.queue.AbstractQueueProducer;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.tephra.Transaction;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/hbase/HBaseQueueProducer.class */
public class HBaseQueueProducer extends AbstractQueueProducer implements Closeable {
    private final HBaseQueueStrategy queueStrategy;
    private final List<ConsumerGroupConfig> consumerGroupConfigs;
    private final byte[] queueRowPrefix;
    private final HTable hTable;
    private final List<byte[]> rollbackKeys;

    public HBaseQueueProducer(HTable hTable, QueueName queueName, QueueMetrics queueMetrics, HBaseQueueStrategy hBaseQueueStrategy, Iterable<? extends ConsumerGroupConfig> iterable) {
        super(queueMetrics, queueName);
        this.queueStrategy = hBaseQueueStrategy;
        this.consumerGroupConfigs = ImmutableList.copyOf(Iterables.filter(iterable, new Predicate<ConsumerGroupConfig>() { // from class: co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueProducer.1
            private final Set<Long> seenGroups = Sets.newHashSet();

            public boolean apply(ConsumerGroupConfig consumerGroupConfig) {
                return this.seenGroups.add(Long.valueOf(consumerGroupConfig.getGroupId()));
            }
        }));
        this.queueRowPrefix = QueueEntryRow.getQueueRowPrefix(queueName);
        this.rollbackKeys = Lists.newArrayList();
        this.hTable = hTable;
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueProducer
    public void startTx(Transaction transaction) {
        super.startTx(transaction);
        this.rollbackKeys.clear();
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueProducer, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.hTable.close();
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueProducer
    protected int persist(Iterable<QueueEntry> iterable, Transaction transaction) throws IOException {
        int i = 0;
        ArrayList newArrayList = Lists.newArrayList();
        int i2 = 0;
        ArrayList newArrayList2 = Lists.newArrayList();
        long writePointer = transaction.getWritePointer();
        for (QueueEntry queueEntry : iterable) {
            newArrayList2.clear();
            this.queueStrategy.getRowKeys(this.consumerGroupConfigs, queueEntry, this.queueRowPrefix, writePointer, i, newArrayList2);
            this.rollbackKeys.addAll(newArrayList2);
            byte[] serializeHashKeys = QueueEntry.serializeHashKeys(queueEntry.getHashKeys());
            Iterator it = newArrayList2.iterator();
            while (it.hasNext()) {
                Put put = new Put((byte[]) it.next());
                put.add(QueueEntryRow.COLUMN_FAMILY, QueueEntryRow.DATA_COLUMN, queueEntry.getData());
                put.add(QueueEntryRow.COLUMN_FAMILY, QueueEntryRow.META_COLUMN, serializeHashKeys);
                newArrayList.add(put);
                i2 += queueEntry.getData().length;
            }
            i++;
        }
        this.hTable.put(newArrayList);
        this.hTable.flushCommits();
        return i2;
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueProducer
    protected void doRollback() throws Exception {
        if (this.rollbackKeys.isEmpty()) {
            return;
        }
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<byte[]> it = this.rollbackKeys.iterator();
        while (it.hasNext()) {
            newArrayList.add(new Delete(it.next()));
        }
        this.hTable.delete(newArrayList);
        this.hTable.flushCommits();
    }
}
