package co.cask.cdap.messaging.store.cache;

import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.TimeProvider;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.cache.MessageCache;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.messaging.store.MessageFilter;
import co.cask.cdap.messaging.store.MessageTable;
import co.cask.cdap.messaging.store.TransactionMessageFilter;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.tephra.Transaction;

/* loaded from: input_file:co/cask/cdap/messaging/store/cache/CachingMessageTable.class */
final class CachingMessageTable implements MessageTable {

    @VisibleForTesting
    static final String PRUNE_GRACE_PERIOD = "data.tx.grace.period";
    private final long gracePeriod;
    private final MessageTable messageTable;
    private final MessageTableCacheProvider cacheProvider;
    private final TimeProvider timeProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:co/cask/cdap/messaging/store/cache/CachingMessageTable$CacheMessageTableEntry.class */
    public static final class CacheMessageTableEntry implements MessageTable.Entry {
        private final boolean lookupOnly;
        private final TopicId topicId;
        private final int generation;
        private final boolean transactional;
        private final byte[] payload;
        private final long publishTimestamp;
        private final short sequenceId;
        private long transactionWritePointer;
        private boolean rollback;

        CacheMessageTableEntry(TopicMetadata topicMetadata, long j, short s) {
            this.lookupOnly = true;
            this.topicId = topicMetadata.getTopicId();
            this.generation = topicMetadata.getGeneration();
            this.transactional = false;
            this.payload = null;
            this.publishTimestamp = j;
            this.sequenceId = s;
        }

        CacheMessageTableEntry(MessageTable.Entry entry) {
            this.lookupOnly = false;
            this.topicId = entry.getTopicId();
            this.generation = entry.getGeneration();
            this.transactional = entry.isTransactional();
            this.transactionWritePointer = entry.getTransactionWritePointer();
            this.payload = entry.getPayload();
            this.publishTimestamp = entry.getPublishTimestamp();
            this.sequenceId = entry.getSequenceId();
        }

        void rollback() {
            if (isTransactional()) {
                this.rollback = true;
            }
        }

        public boolean isRollback() {
            return this.rollback;
        }

        @Override // co.cask.cdap.messaging.store.MessageTable.Entry
        public TopicId getTopicId() {
            return this.topicId;
        }

        @Override // co.cask.cdap.messaging.store.MessageTable.Entry
        public int getGeneration() {
            return this.generation;
        }

        @Override // co.cask.cdap.messaging.store.MessageTable.Entry
        public boolean isPayloadReference() {
            return getPayload() == null;
        }

        @Override // co.cask.cdap.messaging.store.MessageTable.Entry
        public boolean isTransactional() {
            if (this.lookupOnly) {
                throw new UnsupportedOperationException();
            }
            return this.transactional;
        }

        @Override // co.cask.cdap.messaging.store.MessageTable.Entry
        public long getTransactionWritePointer() {
            if (this.lookupOnly) {
                throw new UnsupportedOperationException();
            }
            return this.transactionWritePointer;
        }

        @Override // co.cask.cdap.messaging.store.MessageTable.Entry
        @Nullable
        public byte[] getPayload() {
            if (this.lookupOnly) {
                throw new UnsupportedOperationException();
            }
            return this.payload;
        }

        @Override // co.cask.cdap.messaging.store.MessageTable.Entry
        public long getPublishTimestamp() {
            return this.publishTimestamp;
        }

        @Override // co.cask.cdap.messaging.store.MessageTable.Entry
        public short getSequenceId() {
            return this.sequenceId;
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/store/cache/CachingMessageTable$CombineMessageEntryIterator.class */
    private static final class CombineMessageEntryIterator extends AbstractCloseableIterator<MessageTable.Entry> {
        private final CloseableIterator<MessageTable.Entry> tableIterator;
        private final MessageCache.Scanner<MessageTable.Entry> scanner;
        private final Comparator<MessageTable.Entry> comparator;
        private boolean iterateCache;
        private MessageTable.Entry firstCachedEntry;
        private int count;

        private CombineMessageEntryIterator(CloseableIterator<MessageTable.Entry> closeableIterator, MessageCache.Scanner<MessageTable.Entry> scanner, Comparator<MessageTable.Entry> comparator, int i) {
            this.tableIterator = closeableIterator;
            this.scanner = scanner;
            this.comparator = comparator;
            this.firstCachedEntry = scanner.hasNext() ? (MessageTable.Entry) scanner.next() : null;
            this.count = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public MessageTable.Entry m26computeNext() {
            if (this.count <= 0) {
                return (MessageTable.Entry) endOfData();
            }
            this.count--;
            if (this.iterateCache) {
                return this.scanner.hasNext() ? (MessageTable.Entry) this.scanner.next() : (MessageTable.Entry) endOfData();
            }
            if (!this.tableIterator.hasNext()) {
                return (MessageTable.Entry) endOfData();
            }
            MessageTable.Entry entry = (MessageTable.Entry) this.tableIterator.next();
            if (this.firstCachedEntry != null && this.comparator.compare(entry, this.firstCachedEntry) == 0) {
                entry = this.firstCachedEntry;
                this.firstCachedEntry = null;
                this.iterateCache = true;
            }
            return entry;
        }

        public void close() {
            try {
                this.tableIterator.close();
            } finally {
                this.scanner.close();
            }
        }
    }

    /* loaded from: input_file:co/cask/cdap/messaging/store/cache/CachingMessageTable$CopyingIterator.class */
    private static final class CopyingIterator extends AbstractIterator<MessageTable.Entry> {
        private final Iterator<? extends MessageTable.Entry> iterator;
        private final Multimap<TopicId, MessageTable.Entry> entries;

        private CopyingIterator(Iterator<? extends MessageTable.Entry> it) {
            this.iterator = it;
            this.entries = LinkedListMultimap.create();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
        public MessageTable.Entry m27computeNext() {
            if (!this.iterator.hasNext()) {
                return (MessageTable.Entry) endOfData();
            }
            MessageTable.Entry next = this.iterator.next();
            this.entries.put(next.getTopicId(), copyEntry(next));
            return next;
        }

        Multimap<TopicId, MessageTable.Entry> getEntries() {
            return this.entries;
        }

        private MessageTable.Entry copyEntry(MessageTable.Entry entry) {
            return new CacheMessageTableEntry(entry);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingMessageTable(CConfiguration cConfiguration, MessageTable messageTable, MessageTableCacheProvider messageTableCacheProvider) {
        this(cConfiguration, messageTable, messageTableCacheProvider, TimeProvider.SYSTEM_TIME);
    }

    @VisibleForTesting
    CachingMessageTable(CConfiguration cConfiguration, MessageTable messageTable, MessageTableCacheProvider messageTableCacheProvider, TimeProvider timeProvider) {
        this.gracePeriod = cConfiguration.getLong(PRUNE_GRACE_PERIOD) / 2;
        this.messageTable = messageTable;
        this.cacheProvider = messageTableCacheProvider;
        this.timeProvider = timeProvider;
    }

    @Override // co.cask.cdap.messaging.store.MessageTable
    public CloseableIterator<MessageTable.Entry> fetch(TopicMetadata topicMetadata, long j, int i, @Nullable Transaction transaction) throws IOException {
        MessageCache<MessageTable.Entry> messageCache = this.cacheProvider.getMessageCache(topicMetadata.getTopicId());
        if (messageCache == null) {
            return this.messageTable.fetch(topicMetadata, j, i, transaction);
        }
        CacheMessageTableEntry cacheMessageTableEntry = new CacheMessageTableEntry(topicMetadata, j, (short) 0);
        MessageTable.Entry adjustLookupEntry = transaction == null ? cacheMessageTableEntry : adjustLookupEntry(topicMetadata, cacheMessageTableEntry);
        MessageCache.Scanner<MessageTable.Entry> scan = messageCache.scan(adjustLookupEntry, true, i, createFilter(topicMetadata, transaction));
        return (cacheMessageTableEntry == adjustLookupEntry && cacheHasAllEntries(cacheMessageTableEntry, scan, messageCache.getComparator())) ? scan : new CombineMessageEntryIterator(this.messageTable.fetch(topicMetadata, j, i, transaction), scan, messageCache.getComparator(), i);
    }

    @Override // co.cask.cdap.messaging.store.MessageTable
    public CloseableIterator<MessageTable.Entry> fetch(TopicMetadata topicMetadata, MessageId messageId, boolean z, int i, @Nullable Transaction transaction) throws IOException {
        MessageCache<MessageTable.Entry> messageCache = this.cacheProvider.getMessageCache(topicMetadata.getTopicId());
        if (messageCache == null) {
            return this.messageTable.fetch(topicMetadata, messageId, z, i, transaction);
        }
        CacheMessageTableEntry cacheMessageTableEntry = new CacheMessageTableEntry(topicMetadata, messageId.getPublishTimestamp(), messageId.getSequenceId());
        MessageTable.Entry adjustLookupEntry = transaction == null ? cacheMessageTableEntry : adjustLookupEntry(topicMetadata, cacheMessageTableEntry);
        MessageCache.Scanner<MessageTable.Entry> scan = messageCache.scan(adjustLookupEntry, z, i, createFilter(topicMetadata, transaction));
        return (cacheMessageTableEntry == adjustLookupEntry && cacheHasAllEntries(cacheMessageTableEntry, scan, messageCache.getComparator())) ? scan : new CombineMessageEntryIterator(this.messageTable.fetch(topicMetadata, messageId, z, i, transaction), scan, messageCache.getComparator(), i);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0, types: [co.cask.cdap.messaging.store.cache.CachingMessageTable$CopyingIterator, java.util.Iterator] */
    @Override // co.cask.cdap.messaging.store.MessageTable
    public void store(Iterator<? extends MessageTable.Entry> it) throws IOException {
        ?? copyingIterator = new CopyingIterator(it);
        this.messageTable.store(copyingIterator);
        for (Map.Entry entry : copyingIterator.getEntries().asMap().entrySet()) {
            MessageCache<MessageTable.Entry> messageCache = this.cacheProvider.getMessageCache((TopicId) entry.getKey());
            if (messageCache != null) {
                messageCache.addAll(((Collection) entry.getValue()).iterator());
            }
        }
    }

    @Override // co.cask.cdap.messaging.store.MessageTable
    public void rollback(TopicMetadata topicMetadata, RollbackDetail rollbackDetail) throws IOException {
        MessageCache<MessageTable.Entry> messageCache = this.cacheProvider.getMessageCache(topicMetadata.getTopicId());
        if (messageCache != null) {
            messageCache.updateEntries(new CacheMessageTableEntry(topicMetadata, rollbackDetail.getStartTimestamp(), (short) rollbackDetail.getStartSequenceId()), new CacheMessageTableEntry(topicMetadata, rollbackDetail.getEndTimestamp(), (short) rollbackDetail.getEndSequenceId()), new MessageCache.EntryUpdater<MessageTable.Entry>() { // from class: co.cask.cdap.messaging.store.cache.CachingMessageTable.1
                @Override // co.cask.cdap.messaging.cache.MessageCache.EntryUpdater
                public void updateEntry(MessageTable.Entry entry) {
                    if (!(entry instanceof CacheMessageTableEntry)) {
                        throw new IllegalStateException("Entries in MessageCache must be of type " + CacheMessageTableEntry.class.getName() + ", but got type " + entry.getClass().getName() + " instead.");
                    }
                    ((CacheMessageTableEntry) entry).rollback();
                }
            });
        }
        this.messageTable.rollback(topicMetadata, rollbackDetail);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.messageTable.close();
    }

    private MessageTable.Entry adjustLookupEntry(TopicMetadata topicMetadata, MessageTable.Entry entry) {
        long currentTimeMillis = this.timeProvider.currentTimeMillis() - this.gracePeriod;
        return entry.getPublishTimestamp() >= currentTimeMillis ? entry : new CacheMessageTableEntry(topicMetadata, currentTimeMillis, (short) 0);
    }

    private boolean cacheHasAllEntries(MessageTable.Entry entry, MessageCache.Scanner<MessageTable.Entry> scanner, Comparator<MessageTable.Entry> comparator) {
        MessageTable.Entry firstInCache = scanner.getFirstInCache();
        return firstInCache != null && comparator.compare(firstInCache, entry) <= 0;
    }

    private MessageFilter<MessageTable.Entry> createFilter(TopicMetadata topicMetadata, @Nullable Transaction transaction) {
        final int generation = topicMetadata.getGeneration();
        return transaction == null ? new MessageFilter<MessageTable.Entry>() { // from class: co.cask.cdap.messaging.store.cache.CachingMessageTable.2
            @Override // co.cask.cdap.messaging.store.MessageFilter
            /* renamed from: apply, reason: avoid collision after fix types in other method and merged with bridge method [inline-methods] */
            public MessageFilter.Result mo24apply(MessageTable.Entry entry) {
                return generation == entry.getGeneration() ? MessageFilter.Result.ACCEPT : MessageFilter.Result.SKIP;
            }
        } : new TransactionMessageFilter(transaction) { // from class: co.cask.cdap.messaging.store.cache.CachingMessageTable.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.messaging.store.TransactionMessageFilter, co.cask.cdap.messaging.store.MessageFilter
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public MessageFilter.Result mo24apply(MessageTable.Entry entry) {
                return generation != entry.getGeneration() ? MessageFilter.Result.SKIP : ((entry instanceof CacheMessageTableEntry) && ((CacheMessageTableEntry) entry).isRollback()) ? MessageFilter.Result.SKIP : super.mo24apply(entry);
            }
        };
    }
}
