package co.cask.cdap.data2.transaction.stream.leveldb;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.table.Row;
import co.cask.cdap.api.dataset.table.Scanner;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data.stream.StreamEventOffset;
import co.cask.cdap.data.stream.StreamFileOffset;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.KeyValue;
import co.cask.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableCore;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import co.cask.cdap.data2.transaction.stream.StreamConsumerState;
import co.cask.cdap.data2.transaction.stream.StreamConsumerStateStore;
import co.cask.tephra.Transaction;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Arrays;
import java.util.NavigableMap;
import java.util.TreeMap;
import javax.annotation.Nullable;

/* loaded from: input_file:co/cask/cdap/data2/transaction/stream/leveldb/LevelDBStreamFileConsumer.class */
public final class LevelDBStreamFileConsumer extends AbstractStreamFileConsumer {
    private static final byte[] DUMMY_STATE_CONTENT = new byte[0];
    private final LevelDBTableCore tableCore;
    private final Object dbLock;
    private final NavigableMap<byte[], NavigableMap<byte[], byte[]>> rowMapForClaim;
    private final NavigableMap<byte[], byte[]> colMapForClaim;

    public LevelDBStreamFileConsumer(CConfiguration cConfiguration, StreamConfig streamConfig, ConsumerConfig consumerConfig, FileReader<StreamEventOffset, Iterable<StreamFileOffset>> fileReader, StreamConsumerStateStore streamConsumerStateStore, StreamConsumerState streamConsumerState, @Nullable ReadFilter readFilter, LevelDBTableCore levelDBTableCore, Object obj) {
        super(cConfiguration, streamConfig, consumerConfig, fileReader, streamConsumerStateStore, streamConsumerState, readFilter);
        this.tableCore = levelDBTableCore;
        this.dbLock = obj;
        this.rowMapForClaim = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        this.colMapForClaim = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected boolean claimFifoEntry(byte[] bArr, byte[] bArr2, byte[] bArr3) throws IOException {
        synchronized (this.dbLock) {
            if (!Arrays.equals(this.tableCore.getRow(bArr, (byte[][]) new byte[]{this.stateColumnName}, (byte[]) null, (byte[]) null, -1, Transaction.ALL_VISIBLE_LATEST).get(this.stateColumnName), bArr3)) {
                return false;
            }
            this.rowMapForClaim.clear();
            this.colMapForClaim.clear();
            this.colMapForClaim.put(this.stateColumnName, bArr2);
            this.rowMapForClaim.put(bArr, this.colMapForClaim);
            this.tableCore.persist(this.rowMapForClaim, KeyValue.LATEST_TIMESTAMP);
            return true;
        }
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected void updateState(Iterable<byte[]> iterable, int i, byte[] bArr) throws IOException {
        TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        for (byte[] bArr2 : iterable) {
            TreeMap newTreeMap2 = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
            newTreeMap2.put(this.stateColumnName, bArr);
            newTreeMap.put(bArr2, newTreeMap2);
        }
        this.tableCore.persist(newTreeMap, KeyValue.LATEST_TIMESTAMP);
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected void undoState(Iterable<byte[]> iterable, int i) throws IOException {
        TreeMap newTreeMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
        for (byte[] bArr : iterable) {
            TreeMap newTreeMap2 = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
            newTreeMap2.put(this.stateColumnName, DUMMY_STATE_CONTENT);
            newTreeMap.put(bArr, newTreeMap2);
        }
        this.tableCore.undo(newTreeMap, KeyValue.LATEST_TIMESTAMP);
    }

    @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer
    protected AbstractStreamFileConsumer.StateScanner scanStates(byte[] bArr, byte[] bArr2) throws IOException {
        final Scanner scan = this.tableCore.scan(bArr, bArr2, null, (byte[][]) null, Transaction.ALL_VISIBLE_LATEST);
        return new AbstractStreamFileConsumer.StateScanner() { // from class: co.cask.cdap.data2.transaction.stream.leveldb.LevelDBStreamFileConsumer.1
            private Row pair;

            @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.StateScanner
            public boolean nextStateRow() throws IOException {
                this.pair = scan.next();
                return this.pair != null;
            }

            @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.StateScanner
            public byte[] getRow() {
                return this.pair.getRow();
            }

            @Override // co.cask.cdap.data2.transaction.stream.AbstractStreamFileConsumer.StateScanner
            public byte[] getState() {
                return (byte[]) this.pair.getColumns().get(LevelDBStreamFileConsumer.this.stateColumnName);
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                scan.close();
            }
        };
    }
}
