package co.cask.cdap.data2.transaction.queue.leveldb;

import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.LevelDBOrderedTableCore;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.queue.QueueEvictor;
import co.cask.tephra.Transaction;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/leveldb/LevelDBQueueEvictor.class */
public class LevelDBQueueEvictor implements QueueEvictor {
    private static final Logger LOG = LoggerFactory.getLogger(LevelDBQueueEvictor.class);
    private final LevelDBOrderedTableCore core;
    private final byte[] queueRowPrefix;
    private final Executor executor;
    private final int numGroups;
    private final QueueName name;

    public LevelDBQueueEvictor(LevelDBOrderedTableCore levelDBOrderedTableCore, QueueName queueName, int i, Executor executor) {
        this.core = levelDBOrderedTableCore;
        this.executor = executor;
        this.queueRowPrefix = QueueEntryRow.getQueueRowPrefix(queueName);
        this.numGroups = i;
        this.name = queueName;
    }

    @Override // co.cask.cdap.data2.transaction.queue.QueueEvictor
    public ListenableFuture<Integer> evict(final Transaction transaction) {
        final SettableFuture create = SettableFuture.create();
        this.executor.execute(new Runnable() { // from class: co.cask.cdap.data2.transaction.queue.leveldb.LevelDBQueueEvictor.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    create.set(Integer.valueOf(LevelDBQueueEvictor.this.doEvict(transaction)));
                } catch (Throwable th) {
                    create.setException(th);
                }
            }
        });
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized int doEvict(Transaction transaction) throws IOException {
        byte[] stopRowForTransaction = QueueEntryRow.getStopRowForTransaction(this.queueRowPrefix, transaction);
        ArrayList newArrayList = Lists.newArrayList();
        Scanner scan = this.core.scan(this.queueRowPrefix, stopRowForTransaction, null, (byte[][]) null, Transaction.ALL_VISIBLE_LATEST);
        while (true) {
            try {
                Row next = scan.next();
                if (next == null) {
                    break;
                }
                int i = 0;
                for (Map.Entry entry : next.getColumns().entrySet()) {
                    if (QueueEntryRow.isStateColumn((byte[]) entry.getKey())) {
                        if (QueueEntryRow.isCommittedProcessed((byte[]) entry.getValue(), transaction)) {
                            i++;
                        }
                    }
                }
                if (i >= this.numGroups) {
                    newArrayList.add(next.getRow());
                }
            } finally {
                scan.close();
            }
        }
        if (newArrayList.isEmpty()) {
            LOG.trace("Nothing to evict from queue {}", this.name);
        } else {
            this.core.deleteRows(newArrayList);
            LOG.trace("Evicted {} entries from queue {}", Integer.valueOf(newArrayList.size()), this.name);
        }
        return newArrayList.size();
    }
}
