package co.cask.cdap.messaging.store;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.messaging.MessagingUtils;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.messaging.store.PayloadTable;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.collect.AbstractIterator;
import java.io.IOException;
import java.util.Iterator;

/* loaded from: input_file:co/cask/cdap/messaging/store/AbstractPayloadTable.class */
public abstract class AbstractPayloadTable implements PayloadTable {

    /* loaded from: input_file:co/cask/cdap/messaging/store/AbstractPayloadTable$StoreIterator.class */
    private static class StoreIterator extends AbstractIterator<RawPayloadTableEntry> {
        private final Iterator<? extends PayloadTable.Entry> entries;
        private final RawPayloadTableEntry tableEntry;
        private TopicId topicId;
        private int generation;
        private byte[] topic;
        private byte[] rowKey;
        private PayloadTable.Entry nextEntry;

        private StoreIterator(Iterator<? extends PayloadTable.Entry> it) {
            this.entries = it;
            this.tableEntry = new RawPayloadTableEntry();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public RawPayloadTableEntry m22computeNext() {
            if (!this.entries.hasNext()) {
                return (RawPayloadTableEntry) endOfData();
            }
            PayloadTable.Entry next = this.entries.next();
            if (this.topicId == null || !this.topicId.equals(next.getTopicId()) || this.generation != next.getGeneration()) {
                this.topicId = next.getTopicId();
                this.generation = next.getGeneration();
                this.topic = MessagingUtils.toDataKeyPrefix(this.topicId, next.getGeneration());
                this.rowKey = new byte[this.topic.length + 16 + 2];
            }
            Bytes.putBytes(this.rowKey, 0, this.topic, 0, this.topic.length);
            Bytes.putLong(this.rowKey, this.topic.length, next.getTransactionWritePointer());
            Bytes.putLong(this.rowKey, this.topic.length + 8, next.getPayloadWriteTimestamp());
            Bytes.putShort(this.rowKey, this.topic.length + 16, next.getPayloadSequenceId());
            return this.tableEntry.set(this.rowKey, next.getPayload());
        }
    }

    protected abstract void persist(Iterator<RawPayloadTableEntry> it) throws IOException;

    protected abstract CloseableIterator<RawPayloadTableEntry> read(byte[] bArr, byte[] bArr2, int i) throws IOException;

    @Override // co.cask.cdap.messaging.store.PayloadTable
    public void store(Iterator<? extends PayloadTable.Entry> it) throws IOException {
        persist(new StoreIterator(it));
    }

    @Override // co.cask.cdap.messaging.store.PayloadTable
    public CloseableIterator<PayloadTable.Entry> fetch(TopicMetadata topicMetadata, long j, MessageId messageId, final boolean z, int i) throws IOException {
        byte[] dataKeyPrefix = MessagingUtils.toDataKeyPrefix(topicMetadata.getTopicId(), topicMetadata.getGeneration());
        byte[] bArr = new byte[dataKeyPrefix.length + 16 + 2];
        byte[] bArr2 = new byte[dataKeyPrefix.length + 8];
        Bytes.putBytes(bArr, 0, dataKeyPrefix, 0, dataKeyPrefix.length);
        Bytes.putBytes(bArr2, 0, dataKeyPrefix, 0, dataKeyPrefix.length);
        Bytes.putLong(bArr, dataKeyPrefix.length, j);
        Bytes.putLong(bArr2, dataKeyPrefix.length, j);
        Bytes.putLong(bArr, dataKeyPrefix.length + 8, messageId.getPayloadWriteTimestamp());
        Bytes.putShort(bArr, dataKeyPrefix.length + 16, messageId.getPayloadSequenceId());
        final CloseableIterator<RawPayloadTableEntry> read = read(bArr, Bytes.stopKeyForPrefix(bArr2), i);
        return new AbstractCloseableIterator<PayloadTable.Entry>() { // from class: co.cask.cdap.messaging.store.AbstractPayloadTable.1
            private boolean closed = false;
            private boolean skipFirstRow;

            {
                this.skipFirstRow = !z;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public PayloadTable.Entry m21computeNext() {
                if (this.closed || !read.hasNext()) {
                    return (PayloadTable.Entry) endOfData();
                }
                RawPayloadTableEntry rawPayloadTableEntry = (RawPayloadTableEntry) read.next();
                if (this.skipFirstRow) {
                    this.skipFirstRow = false;
                    if (!read.hasNext()) {
                        return (PayloadTable.Entry) endOfData();
                    }
                    rawPayloadTableEntry = (RawPayloadTableEntry) read.next();
                }
                return new ImmutablePayloadTableEntry(rawPayloadTableEntry.getKey(), rawPayloadTableEntry.getValue());
            }

            public void close() {
                try {
                    read.close();
                } finally {
                    endOfData();
                    this.closed = true;
                }
            }
        };
    }
}
