package co.cask.cdap.data2.transaction.queue.leveldb;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.LevelDBOrderedTableCore;
import co.cask.cdap.data2.queue.QueueEntry;
import co.cask.cdap.data2.transaction.queue.AbstractQueueProducer;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.queue.QueueMetrics;
import co.cask.tephra.Transaction;
import com.google.common.collect.Maps;
import java.util.NavigableMap;
import java.util.TreeMap;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/leveldb/LevelDBQueueProducer.class */
public final class LevelDBQueueProducer extends AbstractQueueProducer {
    private final LevelDBOrderedTableCore core;
    private final byte[] queueRowPrefix;
    private final NavigableMap<byte[], NavigableMap<byte[], byte[]>> changes;

    public LevelDBQueueProducer(LevelDBOrderedTableCore levelDBOrderedTableCore, QueueName queueName, QueueMetrics queueMetrics) {
        super(queueMetrics, queueName);
        this.core = levelDBOrderedTableCore;
        this.changes = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        this.queueRowPrefix = QueueEntryRow.getQueueRowPrefix(queueName);
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueProducer
    public void startTx(Transaction transaction) {
        super.startTx(transaction);
        this.changes.clear();
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueProducer
    protected int persist(Iterable<QueueEntry> iterable, Transaction transaction) throws Exception {
        byte[] add = Bytes.add(this.queueRowPrefix, Bytes.toBytes(transaction.getWritePointer()));
        int i = 0;
        int i2 = 0;
        for (QueueEntry queueEntry : iterable) {
            int i3 = i;
            i++;
            byte[] add2 = Bytes.add(add, Bytes.toBytes(i3));
            TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
            newTreeMap.put(QueueEntryRow.DATA_COLUMN, queueEntry.getData());
            newTreeMap.put(QueueEntryRow.META_COLUMN, QueueEntry.serializeHashKeys(queueEntry.getHashKeys()));
            this.changes.put(add2, newTreeMap);
            i2 += queueEntry.getData().length;
        }
        this.core.persist(this.changes, KeyValue.LATEST_TIMESTAMP);
        return i2;
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueProducer
    protected void doRollback() throws Exception {
        this.core.undo(this.changes, KeyValue.LATEST_TIMESTAMP);
    }
}
