package co.cask.cdap.data2.transaction.queue;

import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.queue.QueueEntry;
import co.cask.cdap.data2.queue.QueueProducer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionAware;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/AbstractQueueProducer.class */
public abstract class AbstractQueueProducer implements QueueProducer, TransactionAware {
    private final QueueMetrics queueMetrics;
    private final BlockingQueue<QueueEntry> queue = new LinkedBlockingQueue();
    private final QueueName queueName;
    private Transaction transaction;
    private int lastEnqueueCount;
    private int lastEnqueueBytes;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractQueueProducer(QueueMetrics queueMetrics, QueueName queueName) {
        this.queueMetrics = queueMetrics;
        this.queueName = queueName;
    }

    public String getTransactionAwareName() {
        return getClass().getSimpleName() + "(queue = " + this.queueName + ")";
    }

    @Override // co.cask.cdap.data2.queue.QueueProducer
    public void enqueue(QueueEntry queueEntry) throws IOException {
        Preconditions.checkState(this.transaction != null, "Enqueue called outside of transaction.");
        this.queue.add(queueEntry);
    }

    @Override // co.cask.cdap.data2.queue.QueueProducer
    public void enqueue(Iterable<QueueEntry> iterable) throws IOException {
        Preconditions.checkState(this.transaction != null, "Enqueue called outside of transaction.");
        Iterables.addAll(this.queue, iterable);
    }

    public void startTx(Transaction transaction) {
        this.queue.clear();
        this.transaction = transaction;
        this.lastEnqueueCount = 0;
        this.lastEnqueueBytes = 0;
    }

    public void updateTx(Transaction transaction) {
        this.transaction = transaction;
    }

    public Collection<byte[]> getTxChanges() {
        return ImmutableList.of();
    }

    public boolean commitTx() throws Exception {
        Preconditions.checkState(this.transaction != null, "Commit without starting transaction.");
        Transaction transaction = this.transaction;
        this.transaction = null;
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.queue.size());
        this.queue.drainTo(newArrayListWithCapacity);
        this.lastEnqueueCount = newArrayListWithCapacity.size();
        this.lastEnqueueBytes = persist(newArrayListWithCapacity, transaction);
        return true;
    }

    public void postTxCommit() {
        if (this.lastEnqueueCount > 0) {
            this.queueMetrics.emitEnqueue(this.lastEnqueueCount);
            this.queueMetrics.emitEnqueueBytes(this.lastEnqueueBytes);
        }
    }

    public boolean rollbackTx() throws Exception {
        Transaction transaction = this.transaction;
        this.transaction = null;
        doRollback();
        return true;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    protected abstract int persist(Iterable<QueueEntry> iterable, Transaction transaction) throws Exception;

    protected abstract void doRollback() throws Exception;
}
