package co.cask.cdap.messaging.store.leveldb;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.messaging.MessagingUtils;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.store.AbstractMessageTable;
import co.cask.cdap.messaging.store.ImmutableMessageTableEntry;
import co.cask.cdap.messaging.store.RawMessageTableEntry;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.WriteOptions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/messaging/store/leveldb/LevelDBMessageTable.class */
public final class LevelDBMessageTable extends AbstractMessageTable {
    private static final WriteOptions WRITE_OPTIONS = new WriteOptions().sync(true);
    private static final String PAYLOAD_COL = "p";
    private static final String TX_COL = "t";
    private final DB levelDB;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/store/leveldb/LevelDBMessageTable$EncodeType.class */
    public enum EncodeType {
        NON_TRANSACTIONAL(0),
        TRANSACTIONAL(1),
        PAYLOAD_REFERENCE(2);

        private final byte type;

        EncodeType(int i) {
            this.type = (byte) i;
        }

        byte getType() {
            return this.type;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LevelDBMessageTable(DB db) {
        this.levelDB = db;
    }

    @Override // co.cask.cdap.messaging.store.AbstractMessageTable
    protected CloseableIterator<RawMessageTableEntry> read(byte[] bArr, byte[] bArr2) throws IOException {
        final DBScanIterator dBScanIterator = new DBScanIterator(this.levelDB, bArr, bArr2);
        final RawMessageTableEntry rawMessageTableEntry = new RawMessageTableEntry();
        return new AbstractCloseableIterator<RawMessageTableEntry>() { // from class: co.cask.cdap.messaging.store.leveldb.LevelDBMessageTable.1
            private boolean closed = false;

            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public RawMessageTableEntry m36computeNext() {
                if (this.closed || !dBScanIterator.hasNext()) {
                    return (RawMessageTableEntry) endOfData();
                }
                Map.Entry entry = (Map.Entry) dBScanIterator.next();
                Map decodeValue = LevelDBMessageTable.this.decodeValue((byte[]) entry.getValue());
                return rawMessageTableEntry.set((byte[]) entry.getKey(), (byte[]) decodeValue.get(LevelDBMessageTable.TX_COL), (byte[]) decodeValue.get(LevelDBMessageTable.PAYLOAD_COL));
            }

            public void close() {
                try {
                    dBScanIterator.close();
                    endOfData();
                    this.closed = true;
                } catch (Throwable th) {
                    endOfData();
                    this.closed = true;
                    throw th;
                }
            }
        };
    }

    @Override // co.cask.cdap.messaging.store.AbstractMessageTable
    protected void persist(Iterator<RawMessageTableEntry> it) throws IOException {
        try {
            WriteBatch createWriteBatch = this.levelDB.createWriteBatch();
            Throwable th = null;
            while (it.hasNext()) {
                try {
                    try {
                        RawMessageTableEntry next = it.next();
                        byte[] key = next.getKey();
                        createWriteBatch.put(Arrays.copyOf(key, key.length), encodeValue(next.getTxPtr(), next.getPayload()));
                    } finally {
                    }
                } finally {
                }
            }
            this.levelDB.write(createWriteBatch, WRITE_OPTIONS);
            if (createWriteBatch != null) {
                if (0 != 0) {
                    try {
                        createWriteBatch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createWriteBatch.close();
                }
            }
        } catch (DBException e) {
            throw new IOException((Throwable) e);
        }
    }

    @Override // co.cask.cdap.messaging.store.AbstractMessageTable
    protected void rollback(byte[] bArr, byte[] bArr2, byte[] bArr3) throws IOException {
        WriteBatch createWriteBatch = this.levelDB.createWriteBatch();
        DBScanIterator dBScanIterator = new DBScanIterator(this.levelDB, bArr, bArr2);
        Throwable th = null;
        while (dBScanIterator.hasNext()) {
            try {
                try {
                    Map.Entry entry = (Map.Entry) dBScanIterator.next();
                    createWriteBatch.put((byte[]) entry.getKey(), encodeValue(bArr3, decodeValue((byte[]) entry.getValue()).get(PAYLOAD_COL)));
                } catch (Throwable th2) {
                    if (dBScanIterator != null) {
                        if (th != null) {
                            try {
                                dBScanIterator.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            dBScanIterator.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (dBScanIterator != null) {
            if (0 != 0) {
                try {
                    dBScanIterator.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                dBScanIterator.close();
            }
        }
        try {
            this.levelDB.write(createWriteBatch, WRITE_OPTIONS);
        } catch (DBException e) {
            throw new IOException((Throwable) e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public void pruneMessages(TopicMetadata topicMetadata, long j) throws IOException {
        WriteBatch createWriteBatch = this.levelDB.createWriteBatch();
        long millis = TimeUnit.SECONDS.toMillis(topicMetadata.getTTL());
        byte[] dataKeyPrefix = MessagingUtils.toDataKeyPrefix(topicMetadata.getTopicId(), Integer.parseInt(MessagingUtils.Constants.DEFAULT_GENERATION));
        DBScanIterator dBScanIterator = new DBScanIterator(this.levelDB, dataKeyPrefix, Bytes.stopKeyForPrefix(dataKeyPrefix));
        Throwable th = null;
        while (dBScanIterator.hasNext()) {
            try {
                try {
                    Map.Entry entry = (Map.Entry) dBScanIterator.next();
                    ImmutableMessageTableEntry immutableMessageTableEntry = new ImmutableMessageTableEntry((byte[]) entry.getKey(), null, null);
                    int generation = immutableMessageTableEntry.getGeneration();
                    int generation2 = topicMetadata.getGeneration();
                    if (!MessagingUtils.isOlderGeneration(generation, generation2)) {
                        if (generation != Math.abs(generation2) || j - immutableMessageTableEntry.getPublishTimestamp() <= millis) {
                            break;
                        } else {
                            createWriteBatch.delete((byte[]) entry.getKey());
                        }
                    } else {
                        createWriteBatch.delete((byte[]) entry.getKey());
                    }
                } finally {
                }
            } catch (Throwable th2) {
                if (dBScanIterator != null) {
                    if (th != null) {
                        try {
                            dBScanIterator.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        dBScanIterator.close();
                    }
                }
                throw th2;
            }
        }
        if (dBScanIterator != null) {
            if (0 != 0) {
                try {
                    dBScanIterator.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                dBScanIterator.close();
            }
        }
        try {
            this.levelDB.write(createWriteBatch, WRITE_OPTIONS);
        } catch (DBException e) {
            throw new IOException((Throwable) e);
        }
    }

    private byte[] encodeValue(@Nullable byte[] bArr, @Nullable byte[] bArr2) {
        if (bArr == null) {
            Preconditions.checkArgument(bArr2 != null, "Payload cannot be null for non-transactional message");
            byte[] bArr3 = new byte[1 + bArr2.length];
            bArr3[0] = EncodeType.NON_TRANSACTIONAL.getType();
            Bytes.putBytes(bArr3, 1, bArr2, 0, bArr2.length);
            return bArr3;
        }
        if (bArr2 == null) {
            byte[] bArr4 = new byte[9];
            bArr4[0] = EncodeType.PAYLOAD_REFERENCE.getType();
            Bytes.putBytes(bArr4, 1, bArr, 0, bArr.length);
            return bArr4;
        }
        byte[] bArr5 = new byte[9 + bArr2.length];
        bArr5[0] = EncodeType.TRANSACTIONAL.getType();
        Bytes.putBytes(bArr5, 1, bArr, 0, bArr.length);
        Bytes.putBytes(bArr5, 9, bArr2, 0, bArr2.length);
        return bArr5;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, byte[]> decodeValue(byte[] bArr) {
        HashMap hashMap = new HashMap();
        if (bArr[0] == EncodeType.NON_TRANSACTIONAL.getType()) {
            hashMap.put(PAYLOAD_COL, Arrays.copyOfRange(bArr, 1, bArr.length));
        } else {
            hashMap.put(TX_COL, Arrays.copyOfRange(bArr, 1, 9));
            if (bArr[0] == EncodeType.TRANSACTIONAL.getType()) {
                hashMap.put(PAYLOAD_COL, Arrays.copyOfRange(bArr, 9, bArr.length));
            }
        }
        return hashMap;
    }
}
