package co.cask.cdap.data2.transaction.queue.leveldb;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.common.utils.ImmutablePair;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer;
import co.cask.cdap.data2.transaction.queue.QueueEvictor;
import co.cask.cdap.data2.transaction.queue.QueueScanner;
import co.cask.tephra.Transaction;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data2/transaction/queue/leveldb/LevelDBQueueConsumer.class */
public final class LevelDBQueueConsumer extends AbstractQueueConsumer {
    private static final int EVICTION_LIMIT = 1000;
    private static final long EVICTION_TIMEOUT_SECONDS = 10;
    private final QueueEvictor queueEvictor;
    private final LevelDBTableCore core;
    private final Object lock;
    private final NavigableMap<byte[], NavigableMap<byte[], byte[]>> rowMapForClaim;
    private final NavigableMap<byte[], byte[]> colMapForClaim;
    private static final Logger LOG = LoggerFactory.getLogger(LevelDBQueueConsumer.class);
    private static final byte[] DUMMY_STATE_CONTENT = new byte[0];

    /* JADX INFO: Access modifiers changed from: package-private */
    public LevelDBQueueConsumer(CConfiguration cConfiguration, LevelDBTableCore levelDBTableCore, Object obj, ConsumerConfig consumerConfig, QueueName queueName, QueueEvictor queueEvictor) {
        super(cConfiguration, consumerConfig, queueName);
        this.rowMapForClaim = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        this.colMapForClaim = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        this.queueEvictor = queueEvictor;
        this.core = levelDBTableCore;
        this.lock = obj;
    }

    public void postTxCommit() {
        if (this.commitCount <= EVICTION_LIMIT || this.transaction == null) {
            return;
        }
        this.queueEvictor.evict(this.transaction);
        this.commitCount = 0;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this.transaction != null) {
                Uninterruptibles.getUninterruptibly(this.queueEvictor.evict(this.transaction), EVICTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
            }
        } catch (ExecutionException e) {
            LOG.warn("Failed to perform queue eviction.", e.getCause());
        } catch (TimeoutException e2) {
            LOG.warn("Timeout when performing queue eviction.", e2);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected boolean claimEntry(byte[] bArr, byte[] bArr2) throws IOException {
        synchronized (this.lock) {
            if (this.core.getRow(bArr, (byte[][]) new byte[]{this.stateColumnName}, (byte[]) null, (byte[]) null, -1, Transaction.ALL_VISIBLE_LATEST).get(this.stateColumnName) != null) {
                return false;
            }
            this.rowMapForClaim.clear();
            this.colMapForClaim.clear();
            this.colMapForClaim.put(this.stateColumnName, bArr2);
            this.rowMapForClaim.put(bArr, this.colMapForClaim);
            this.core.persist(this.rowMapForClaim, KeyValue.LATEST_TIMESTAMP);
            return true;
        }
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected void updateState(Set<byte[]> set, byte[] bArr, byte[] bArr2) throws IOException {
        if (set.isEmpty()) {
            return;
        }
        TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        for (byte[] bArr3 : set) {
            TreeMap newTreeMap2 = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
            newTreeMap2.put(bArr, bArr2);
            newTreeMap.put(bArr3, newTreeMap2);
        }
        this.core.persist(newTreeMap, KeyValue.LATEST_TIMESTAMP);
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected void undoState(Set<byte[]> set, byte[] bArr) throws IOException, InterruptedException {
        if (set.isEmpty()) {
            return;
        }
        TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        for (byte[] bArr2 : set) {
            TreeMap newTreeMap2 = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
            newTreeMap2.put(bArr, DUMMY_STATE_CONTENT);
            newTreeMap.put(bArr2, newTreeMap2);
        }
        this.core.undo(newTreeMap, KeyValue.LATEST_TIMESTAMP);
    }

    @Override // co.cask.cdap.data2.transaction.queue.AbstractQueueConsumer
    protected QueueScanner getScanner(byte[] bArr, byte[] bArr2, int i) throws IOException {
        final Scanner scan = this.core.scan(bArr, bArr2, null, (byte[][]) null, Transaction.ALL_VISIBLE_LATEST);
        return new QueueScanner() { // from class: co.cask.cdap.data2.transaction.queue.leveldb.LevelDBQueueConsumer.1
            @Override // co.cask.cdap.data2.transaction.queue.QueueScanner
            public ImmutablePair<byte[], Map<byte[], byte[]>> next() throws IOException {
                Row next = scan.next();
                if (next == null) {
                    return null;
                }
                return new ImmutablePair<>(next.getRow(), next.getColumns());
            }

            @Override // co.cask.cdap.data2.transaction.queue.QueueScanner
            public void close() throws IOException {
                scan.close();
            }
        };
    }
}
