package org.apache.activemq.artemis.core.paging.cursor.impl;

import io.netty.util.collection.LongObjectHashMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.BulkPageCache;
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.jboss.logging.Logger;

/* loaded from: input_file:WEB-INF/lib/artemis-server-2.17.0.jar:org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.class */
public class PageCursorProviderImpl implements PageCursorProvider {
    protected final AtomicInteger scheduledCleanup;
    protected volatile boolean cleanupEnabled;
    protected final PagingStore pagingStore;
    protected final StorageManager storageManager;
    private final ArtemisExecutor executor;
    private final SoftValueLongObjectHashMap<BulkPageCache> softCache;
    private LongObjectHashMap<Integer> numberOfMessages;
    private final LongObjectHashMap<CompletableFuture<BulkPageCache>> inProgressReadPages;
    private final ConcurrentLongHashMap<PageSubscription> activeCursors;
    private static final Logger logger = Logger.getLogger((Class<?>) PageCursorProviderImpl.class);
    private static final long PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
    private static final long CONCURRENT_PAGE_READ_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10);
    private static final long PAGE_READ_PERMISSION_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(10);

    public PageCursorProviderImpl(PagingStore pagingStore, StorageManager storageManager, ArtemisExecutor artemisExecutor, int i) {
        this(pagingStore, storageManager, artemisExecutor, i, false);
    }

    public PageCursorProviderImpl(PagingStore pagingStore, StorageManager storageManager, ArtemisExecutor artemisExecutor, int i, boolean z) {
        this.scheduledCleanup = new AtomicInteger(0);
        this.cleanupEnabled = true;
        this.numberOfMessages = null;
        this.activeCursors = new ConcurrentLongHashMap<>();
        this.pagingStore = pagingStore;
        this.storageManager = storageManager;
        this.executor = artemisExecutor;
        this.softCache = new SoftValueLongObjectHashMap<>(i);
        if (!z) {
            this.numberOfMessages = new LongObjectHashMap<>();
        }
        this.inProgressReadPages = new LongObjectHashMap<>();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public synchronized PageSubscription createSubscription(long j, Filter filter, boolean z) {
        if (logger.isTraceEnabled()) {
            logger.trace(((Object) this.pagingStore.getAddress()) + " creating subscription " + j + " with filter " + filter, new Exception("trace"));
        }
        if (this.activeCursors.containsKey(j)) {
            throw new IllegalStateException("Cursor " + j + " had already been created");
        }
        PageSubscriptionImpl pageSubscriptionImpl = new PageSubscriptionImpl(this, this.pagingStore, this.storageManager, this.executor, filter, j, z);
        this.activeCursors.put(j, pageSubscriptionImpl);
        return pageSubscriptionImpl;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public synchronized PageSubscription getSubscription(long j) {
        return this.activeCursors.get(j);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public PagedMessage getMessage(PagePosition pagePosition) {
        PageCache pageCache = getPageCache(pagePosition.getPageNr());
        if (pageCache == null || pagePosition.getMessageNr() >= pageCache.getNumberOfMessages()) {
            throw new NonExistentPage("Invalid messageNumber passed = " + pagePosition + " on " + pageCache);
        }
        return pageCache.getMessage(pagePosition);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public PagedReference newReference(PagePosition pagePosition, PagedMessage pagedMessage, PageSubscription pageSubscription) {
        return new PagedReferenceImpl(pagePosition, pagedMessage, pageSubscription);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public PageCache getPageCache(long j) {
        LivePageCache liveCache;
        try {
            if (j > this.pagingStore.getCurrentWritingPage()) {
                return null;
            }
            boolean z = false;
            Page page = null;
            synchronized (this.softCache) {
                BulkPageCache bulkPageCache = this.softCache.get(j);
                if (bulkPageCache != null) {
                    return bulkPageCache;
                }
                if (!this.pagingStore.checkPageFileExists((int) j)) {
                    return null;
                }
                Page currentPage = this.pagingStore.getCurrentPage();
                if (currentPage != null && currentPage.getPageId() == j && (liveCache = currentPage.getLiveCache()) != null) {
                    this.softCache.put(liveCache.getPageId(), (long) liveCache);
                    return liveCache;
                }
                CompletableFuture<BulkPageCache> completableFuture = this.inProgressReadPages.get(j);
                if (completableFuture == null) {
                    if (this.numberOfMessages != null && this.numberOfMessages.containsKey(j)) {
                        return new PageReader(this.pagingStore.createPage((int) j), this.numberOfMessages.get(j).intValue());
                    }
                    CompletableFuture<BulkPageCache> completableFuture2 = new CompletableFuture<>();
                    page = this.pagingStore.createPage((int) j);
                    z = true;
                    completableFuture = completableFuture2;
                    this.inProgressReadPages.put(j, (long) completableFuture2);
                }
                if (z) {
                    return readPage(j, page, completableFuture);
                }
                while (true) {
                    try {
                        return completableFuture.get(CONCURRENT_PAGE_READ_TIMEOUT_NS, TimeUnit.NANOSECONDS);
                    } catch (TimeoutException e) {
                        logger.warnf("Waiting a concurrent Page::read for pageNr=%d on cursor %s by %d ms", Long.valueOf(j), this.pagingStore.getAddress(), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - System.nanoTime())));
                    }
                }
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2.getMessage(), e2);
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:70:0x0142 A[FINALLY_INSNS] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.activemq.artemis.core.paging.cursor.PageCache readPage(long r10, org.apache.activemq.artemis.core.paging.impl.Page r12, java.util.concurrent.CompletableFuture<org.apache.activemq.artemis.core.paging.cursor.BulkPageCache> r13) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 414
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl.readPage(long, org.apache.activemq.artemis.core.paging.impl.Page, java.util.concurrent.CompletableFuture):org.apache.activemq.artemis.core.paging.cursor.PageCache");
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void addLivePageCache(LivePageCache livePageCache) {
        logger.tracef("Add live page cache %s", livePageCache);
        synchronized (this.softCache) {
            this.softCache.put(livePageCache.getPageId(), (long) livePageCache);
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void setCacheMaxSize(int i) {
        this.softCache.setMaxElements(i);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public int getCacheSize() {
        int size;
        synchronized (this.softCache) {
            size = this.softCache.size();
        }
        return size;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void clearCache() {
        synchronized (this.softCache) {
            this.softCache.clear();
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void processReload() throws Exception {
        List<PageSubscription> values = this.activeCursors.values();
        Iterator<PageSubscription> it = values.iterator();
        while (it.hasNext()) {
            it.next().processReload();
        }
        if (!values.isEmpty()) {
            long checkMinPage = checkMinPage(values);
            if (checkMinPage != Long.MAX_VALUE) {
                long firstPage = this.pagingStore.getFirstPage();
                while (true) {
                    long j = firstPage;
                    if (j >= checkMinPage) {
                        break;
                    }
                    Iterator<PageSubscription> it2 = values.iterator();
                    while (it2.hasNext()) {
                        it2.next().reloadPageInfo(j);
                    }
                    firstPage = j + 1;
                }
            }
        }
        cleanup();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void stop() {
        Iterator<PageSubscription> it = this.activeCursors.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        int i = this.scheduledCleanup.get();
        if (i > 0) {
            logger.tracef("Stopping with %d cleanup tasks to be completed yet", i);
        }
    }

    private void waitForFuture() {
        if (this.executor.flush(10L, TimeUnit.SECONDS)) {
            return;
        }
        ActiveMQServerLogger.LOGGER.timedOutStoppingPagingCursor(this.executor);
        ActiveMQServerLogger.LOGGER.threadDump(ThreadDumpUtil.threadDump(""));
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void flushExecutors() {
        Iterator<PageSubscription> it = this.activeCursors.values().iterator();
        while (it.hasNext()) {
            it.next().flushExecutors();
        }
        waitForFuture();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void close(PageSubscription pageSubscription) {
        this.activeCursors.remove(pageSubscription.getId());
        scheduleCleanup();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void scheduleCleanup() {
        if (logger.isTraceEnabled()) {
            logger.trace("scheduling cleanup", new Exception("trace"));
        }
        if (!this.cleanupEnabled || this.scheduledCleanup.intValue() > 2) {
            return;
        }
        this.scheduledCleanup.incrementAndGet();
        this.executor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl.1
            @Override // java.lang.Runnable
            public void run() {
                PageCursorProviderImpl.this.storageManager.setContext(PageCursorProviderImpl.this.storageManager.newSingleThreadContext());
                try {
                    if (PageCursorProviderImpl.this.cleanupEnabled) {
                        PageCursorProviderImpl.this.cleanup();
                    }
                } finally {
                    PageCursorProviderImpl.this.storageManager.clearContext();
                    PageCursorProviderImpl.this.scheduledCleanup.decrementAndGet();
                }
            }
        });
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void onPageModeCleared() {
        ArrayList<PageSubscription> cloneSubscriptions = cloneSubscriptions();
        TransactionImpl transactionImpl = new TransactionImpl(this.storageManager);
        Iterator<PageSubscription> it = cloneSubscriptions.iterator();
        while (it.hasNext()) {
            PageSubscription next = it.next();
            try {
                next.onPageModeCleared(transactionImpl);
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorCleaningPagingOnQueue(e, next.getQueue().getName().toString());
            }
        }
        try {
            transactionImpl.commit();
        } catch (Exception e2) {
            ActiveMQServerLogger.LOGGER.errorCleaningPagingDuringCommit(e2);
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void disableCleanup() {
        this.cleanupEnabled = false;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void resumeCleanup() {
        this.cleanupEnabled = true;
        scheduleCleanup();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void cleanup() {
        Page depage;
        logger.tracef("performing page cleanup %s", this);
        ArrayList<Page> arrayList = new ArrayList<>();
        this.storageManager.readLock();
        while (!this.pagingStore.lock(100L)) {
            if (!this.pagingStore.isStarted()) {
                return;
            }
        }
        logger.tracef("%s locked", this);
        synchronized (this) {
            try {
                try {
                    if (this.pagingStore.isStarted()) {
                        if (this.pagingStore.getNumberOfPages() == 0) {
                            this.pagingStore.unlock();
                            this.storageManager.readUnLock();
                            return;
                        }
                        ArrayList<PageSubscription> cloneSubscriptions = cloneSubscriptions();
                        long checkMinPage = checkMinPage(cloneSubscriptions);
                        deliverIfNecessary(cloneSubscriptions, checkMinPage);
                        logger.debugf("Asserting cleanup for address %s, firstPage=%d", this.pagingStore.getAddress(), Long.valueOf(checkMinPage));
                        if (checkMinPage == this.pagingStore.getCurrentWritingPage() && this.pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
                            boolean checkPageCompletion = checkPageCompletion(cloneSubscriptions, checkMinPage);
                            if (!this.pagingStore.isStarted()) {
                                this.pagingStore.unlock();
                                this.storageManager.readUnLock();
                                return;
                            } else if (checkPageCompletion) {
                                cleanupComplete(cloneSubscriptions);
                            }
                        }
                        for (long firstPage = this.pagingStore.getFirstPage(); firstPage <= checkMinPage && checkPageCompletion(cloneSubscriptions, firstPage) && (depage = this.pagingStore.depage()) != null; firstPage++) {
                            arrayList.add(depage);
                        }
                        if (this.pagingStore.getNumberOfPages() == 0 || (this.pagingStore.getNumberOfPages() == 1 && this.pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {
                            this.pagingStore.stopPaging();
                        } else if (logger.isTraceEnabled()) {
                            logger.trace("Couldn't cleanup page on address " + ((Object) this.pagingStore.getAddress()) + " as numberOfPages == " + this.pagingStore.getNumberOfPages() + " and currentPage.numberOfMessages = " + this.pagingStore.getCurrentPage().getNumberOfMessages());
                        }
                        this.pagingStore.unlock();
                        this.storageManager.readUnLock();
                        finishCleanup(arrayList);
                    }
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(e, this.pagingStore.getAddress());
                    logger.warn(e.getMessage(), e);
                    this.pagingStore.unlock();
                    this.storageManager.readUnLock();
                }
            } finally {
                this.pagingStore.unlock();
                this.storageManager.readUnLock();
            }
        }
    }

    protected void cleanupComplete(ArrayList<PageSubscription> arrayList) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("Address " + ((Object) this.pagingStore.getAddress()) + " is leaving page mode as all messages are consumed and acknowledged from the page store");
        }
        this.pagingStore.forceAnotherPage();
        storeBookmark(arrayList, this.pagingStore.getCurrentPage());
        this.pagingStore.stopPaging();
    }

    /* JADX WARN: Finally extract failed */
    protected void finishCleanup(ArrayList<Page> arrayList) {
        BulkPageCache bulkPageCache;
        PagedMessage[] messages;
        logger.tracef("this(%s) finishing cleanup on %s", this, arrayList);
        try {
            Iterator<Page> it = arrayList.iterator();
            while (it.hasNext()) {
                Page next = it.next();
                synchronized (this.softCache) {
                    bulkPageCache = this.softCache.get(next.getPageId());
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Removing pageNr=" + next.getPageId() + " from page-cache");
                }
                if (bulkPageCache == null) {
                    this.storageManager.beforePageRead();
                    try {
                        next.open();
                        List<PagedMessage> read = next.read(this.storageManager, true);
                        try {
                            next.close(false, false);
                        } catch (Exception e) {
                        }
                        this.storageManager.afterPageRead();
                        messages = read.isEmpty() ? null : (PagedMessage[]) read.toArray(new PagedMessage[read.size()]);
                    } catch (Throwable th) {
                        try {
                            next.close(false, false);
                        } catch (Exception e2) {
                        }
                        this.storageManager.afterPageRead();
                        throw th;
                    }
                } else {
                    messages = bulkPageCache.getMessages();
                }
                next.delete(messages);
                synchronized (this.softCache) {
                    long pageId = next.getPageId();
                    this.softCache.remove(pageId);
                    if (this.numberOfMessages != null) {
                        this.numberOfMessages.remove(pageId);
                    }
                }
                onDeletePage(next);
            }
        } catch (Exception e3) {
            ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(e3, this.pagingStore.getAddress());
        }
    }

    private boolean checkPageCompletion(ArrayList<PageSubscription> arrayList, long j) {
        logger.tracef("checkPageCompletion(%d)", j);
        boolean z = true;
        Iterator<PageSubscription> it = arrayList.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            PageSubscription next = it.next();
            if (!next.isComplete(j)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Cursor " + next + " was considered incomplete at pageNr=" + j);
                }
                z = false;
            } else if (logger.isDebugEnabled()) {
                logger.debug("Cursor " + next + " was considered **complete** at pageNr=" + j);
            }
        }
        return z;
    }

    private synchronized ArrayList<PageSubscription> cloneSubscriptions() {
        return new ArrayList<>(this.activeCursors.values());
    }

    protected void onDeletePage(Page page) throws Exception {
        Iterator<PageSubscription> it = cloneSubscriptions().iterator();
        while (it.hasNext()) {
            it.next().onDeletePage(page);
        }
    }

    protected void storeBookmark(ArrayList<PageSubscription> arrayList, Page page) throws Exception {
        try {
            Iterator<PageSubscription> it = arrayList.iterator();
            while (it.hasNext()) {
                it.next().confirmPosition(new PagePositionImpl(page.getPageId(), -1));
            }
        } finally {
            Iterator<PageSubscription> it2 = arrayList.iterator();
            while (it2.hasNext()) {
                it2.next().enableAutoCleanup();
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider
    public void printDebug() {
        System.out.println("Debug information for PageCursorProviderImpl:");
        Iterator<BulkPageCache> it = this.softCache.values().iterator();
        while (it.hasNext()) {
            System.out.println("Cache " + it.next());
        }
    }

    public String toString() {
        return "PageCursorProviderImpl{pagingStore=" + this.pagingStore + '}';
    }

    private long checkMinPage(Collection<PageSubscription> collection) {
        long j = Long.MAX_VALUE;
        for (PageSubscription pageSubscription : collection) {
            long firstPage = pageSubscription.getFirstPage();
            if (logger.isDebugEnabled()) {
                logger.debug(((Object) this.pagingStore.getAddress()) + " has a cursor " + pageSubscription + " with first page=" + firstPage);
            }
            if (firstPage >= 0 && firstPage < j) {
                j = firstPage;
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug(((Object) this.pagingStore.getAddress()) + " has minPage=" + j);
        }
        return j;
    }

    private void deliverIfNecessary(Collection<PageSubscription> collection, long j) {
        boolean z = j == ((long) this.pagingStore.getCurrentWritingPage());
        for (PageSubscription pageSubscription : collection) {
            long firstPage = pageSubscription.getFirstPage();
            if (firstPage == j && pageSubscription.getQueue().getMessageCount() == 0 && (!z || !pageSubscription.isComplete(firstPage))) {
                pageSubscription.getQueue().deliverAsync();
                return;
            }
        }
    }
}
