package co.cask.cdap.messaging.store;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.messaging.MessagingUtils;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.messaging.store.MessageTable;
import co.cask.cdap.proto.id.TopicId;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.tephra.Transaction;

/* loaded from: input_file:co/cask/cdap/messaging/store/AbstractMessageTable.class */
public abstract class AbstractMessageTable implements MessageTable {
    private final StoreIterator storeIterator = new StoreIterator();

    /* loaded from: input_file:co/cask/cdap/messaging/store/AbstractMessageTable$FetchIterator.class */
    private static class FetchIterator extends AbstractCloseableIterator<MessageTable.Entry> {
        private final CloseableIterator<RawMessageTableEntry> scanner;
        private final Transaction transaction;
        private byte[] skipStartRow;
        private boolean closed = false;
        private int maxLimit;

        FetchIterator(CloseableIterator<RawMessageTableEntry> closeableIterator, int i, @Nullable byte[] bArr, @Nullable Transaction transaction) {
            this.scanner = closeableIterator;
            this.transaction = transaction;
            this.skipStartRow = bArr;
            this.maxLimit = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public MessageTable.Entry m19computeNext() {
            if (this.closed || this.maxLimit <= 0) {
                return (MessageTable.Entry) endOfData();
            }
            while (this.scanner.hasNext()) {
                RawMessageTableEntry rawMessageTableEntry = (RawMessageTableEntry) this.scanner.next();
                if (this.skipStartRow != null) {
                    byte[] bArr = this.skipStartRow;
                    this.skipStartRow = null;
                    if (Bytes.equals(bArr, rawMessageTableEntry.getKey())) {
                        continue;
                    }
                }
                Result isVisible = AbstractMessageTable.isVisible(rawMessageTableEntry.getTxPtr(), this.transaction);
                if (isVisible == Result.ACCEPT) {
                    this.maxLimit--;
                    return new ImmutableMessageTableEntry(rawMessageTableEntry.getKey(), rawMessageTableEntry.getPayload(), rawMessageTableEntry.getTxPtr());
                }
                if (isVisible == Result.HOLD) {
                    break;
                }
            }
            return (MessageTable.Entry) endOfData();
        }

        public void close() {
            try {
                this.scanner.close();
                endOfData();
                this.closed = true;
            } catch (Throwable th) {
                endOfData();
                this.closed = true;
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/store/AbstractMessageTable$Result.class */
    public enum Result {
        ACCEPT,
        SKIP,
        HOLD
    }

    /* loaded from: input_file:co/cask/cdap/messaging/store/AbstractMessageTable$StoreIterator.class */
    private static class StoreIterator implements Iterator<RawMessageTableEntry> {
        private final RawMessageTableEntry tableEntry;
        private Iterator<? extends MessageTable.Entry> entries;
        private TopicId topicId;
        private int generation;
        private byte[] topic;
        private byte[] rowKey;
        private MessageTable.Entry nextEntry;

        private StoreIterator() {
            this.tableEntry = new RawMessageTableEntry();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.nextEntry != null) {
                return true;
            }
            if (!this.entries.hasNext()) {
                return false;
            }
            this.nextEntry = this.entries.next();
            return true;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public RawMessageTableEntry next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            MessageTable.Entry entry = this.nextEntry;
            this.nextEntry = null;
            if (this.topicId == null || !this.topicId.equals(entry.getTopicId()) || this.generation != entry.getGeneration()) {
                this.topicId = entry.getTopicId();
                this.generation = entry.getGeneration();
                this.topic = MessagingUtils.toDataKeyPrefix(this.topicId, entry.getGeneration());
                this.rowKey = new byte[this.topic.length + 8 + 2];
            }
            Bytes.putBytes(this.rowKey, 0, this.topic, 0, this.topic.length);
            Bytes.putLong(this.rowKey, this.topic.length, entry.getPublishTimestamp());
            Bytes.putShort(this.rowKey, this.topic.length + 8, entry.getSequenceId());
            byte[] bArr = null;
            if (entry.isTransactional()) {
                bArr = Bytes.toBytes(entry.getTransactionWritePointer());
            }
            return this.tableEntry.set(this.rowKey, bArr, entry.getPayload());
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("Remove not supported");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StoreIterator reset(Iterator<? extends MessageTable.Entry> it) {
            this.entries = it;
            this.nextEntry = null;
            return this;
        }
    }

    protected abstract void persist(Iterator<RawMessageTableEntry> it) throws IOException;

    protected abstract void rollback(byte[] bArr, byte[] bArr2, byte[] bArr3) throws IOException;

    protected abstract CloseableIterator<RawMessageTableEntry> read(byte[] bArr, byte[] bArr2) throws IOException;

    @Override // co.cask.cdap.messaging.store.MessageTable
    public CloseableIterator<MessageTable.Entry> fetch(TopicMetadata topicMetadata, long j, int i, @Nullable Transaction transaction) throws IOException {
        byte[] dataKeyPrefix = MessagingUtils.toDataKeyPrefix(topicMetadata.getTopicId(), topicMetadata.getGeneration());
        byte[] bArr = new byte[dataKeyPrefix.length + 8];
        Bytes.putBytes(bArr, 0, dataKeyPrefix, 0, dataKeyPrefix.length);
        Bytes.putLong(bArr, dataKeyPrefix.length, j);
        return new FetchIterator(read(bArr, Bytes.stopKeyForPrefix(dataKeyPrefix)), i, null, transaction);
    }

    @Override // co.cask.cdap.messaging.store.MessageTable
    public CloseableIterator<MessageTable.Entry> fetch(TopicMetadata topicMetadata, MessageId messageId, boolean z, int i, @Nullable Transaction transaction) throws IOException {
        byte[] dataKeyPrefix = MessagingUtils.toDataKeyPrefix(topicMetadata.getTopicId(), topicMetadata.getGeneration());
        byte[] bArr = new byte[dataKeyPrefix.length + 8 + 2];
        Bytes.putBytes(bArr, 0, dataKeyPrefix, 0, dataKeyPrefix.length);
        Bytes.putLong(bArr, dataKeyPrefix.length, messageId.getPublishTimestamp());
        Bytes.putShort(bArr, dataKeyPrefix.length + 8, messageId.getSequenceId());
        return new FetchIterator(read(bArr, Bytes.stopKeyForPrefix(dataKeyPrefix)), i, z ? null : bArr, transaction);
    }

    @Override // co.cask.cdap.messaging.store.MessageTable
    public void store(Iterator<? extends MessageTable.Entry> it) throws IOException {
        persist(this.storeIterator.reset(it));
    }

    @Override // co.cask.cdap.messaging.store.MessageTable
    public void rollback(TopicMetadata topicMetadata, RollbackDetail rollbackDetail) throws IOException {
        byte[] dataKeyPrefix = MessagingUtils.toDataKeyPrefix(topicMetadata.getTopicId(), topicMetadata.getGeneration());
        byte[] bArr = new byte[dataKeyPrefix.length + 8 + 2];
        Bytes.putBytes(bArr, 0, dataKeyPrefix, 0, dataKeyPrefix.length);
        Bytes.putLong(bArr, dataKeyPrefix.length, rollbackDetail.getStartTimestamp());
        Bytes.putShort(bArr, dataKeyPrefix.length + 8, (short) rollbackDetail.getStartSequenceId());
        byte[] bArr2 = new byte[dataKeyPrefix.length + 8 + 2];
        Bytes.putBytes(bArr2, 0, dataKeyPrefix, 0, dataKeyPrefix.length);
        Bytes.putLong(bArr2, dataKeyPrefix.length, rollbackDetail.getEndTimestamp());
        Bytes.putShort(bArr2, dataKeyPrefix.length + 8, (short) rollbackDetail.getEndSequenceId());
        rollback(bArr, Bytes.stopKeyForPrefix(bArr2), Bytes.toBytes((-1) * rollbackDetail.getTransactionWritePointer()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Result isVisible(@Nullable byte[] bArr, @Nullable Transaction transaction) {
        if (transaction == null || bArr == null) {
            return Result.ACCEPT;
        }
        long j = Bytes.toLong(bArr);
        return j < 0 ? Result.SKIP : transaction.isVisible(j) ? Result.ACCEPT : Arrays.binarySearch(transaction.getInvalids(), j) >= 0 ? Result.SKIP : Result.HOLD;
    }
}
