package org.apache.jackrabbit.oak.index.indexer.document.tree;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jackrabbit.JcrConstants;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.analysis.utils.Hash;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.analysis.utils.HyperLogLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/tree/Prefetcher.class */
public class Prefetcher {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Prefetcher.class);
    private static final int PRETCH_THREADS = 16;
    private final TreeStore prefetchStore;
    private final TreeStore indexStore;
    private String blobSuffix;
    private volatile long maxBlobSize;
    private final AtomicLong downloadMax = new AtomicLong();
    private final AtomicLong iterateCount = new AtomicLong();
    private final Semaphore semaphore = new Semaphore(16);
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean closed = new AtomicBoolean();
    private volatile long blobReadAheadSize = 4294967296L;
    private volatile long nodeReadAheadCount = 65536;
    private final ExecutorService executorService = Executors.newFixedThreadPool(19, new ThreadFactory() { // from class: org.apache.jackrabbit.oak.index.indexer.document.tree.Prefetcher.1
        private final AtomicInteger threadNumber = new AtomicInteger(1);

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("BlobPrefetcher-" + this.threadNumber.getAndIncrement());
            return thread;
        }
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/tree/Prefetcher$PrefetchType.class */
    public enum PrefetchType {
        TRACK_INDEXING,
        BLOB_PREFETCH,
        NODESTORE_CACHE_FILLER
    }

    public Prefetcher(TreeStore treeStore, TreeStore treeStore2) {
        this.prefetchStore = treeStore;
        this.indexStore = treeStore2;
    }

    public void setBlobSuffix(String str) {
        this.blobSuffix = str;
    }

    public void setBlobReadAheadSize(long j) {
        this.blobReadAheadSize = j;
    }

    public void setNodeReadAheadCount(long j) {
        this.nodeReadAheadCount = j;
    }

    public boolean shutdown() {
        this.closed.set(true);
        this.executorService.shutdown();
        try {
            return this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.info("Prefetcher termination was interrupted: {}", e.toString());
            return false;
        }
    }

    public void start() {
        if (this.started.getAndSet(true)) {
            return;
        }
        LOG.info("Prefetch suffix '{}', prefetch {}, index {}", this.blobSuffix, this.prefetchStore, this.indexStore);
        this.executorService.submit(iterator(PrefetchType.TRACK_INDEXING));
        this.executorService.submit(iterator(PrefetchType.NODESTORE_CACHE_FILLER));
        if (this.blobSuffix.isEmpty()) {
            return;
        }
        this.executorService.submit(iterator(PrefetchType.BLOB_PREFETCH));
    }

    public void sleep(String str) throws InterruptedException {
        Thread.sleep(10L);
    }

    Runnable iterator(PrefetchType prefetchType) {
        return () -> {
            PropertyState property;
            Iterator<String> iteratorOverPaths = this.prefetchStore.iteratorOverPaths();
            HyperLogLog hyperLogLog = new HyperLogLog(1024, 0);
            AtomicLong atomicLong = new AtomicLong();
            long j = 0;
            try {
                long j2 = 0;
                long j3 = 0;
                long j4 = 0;
                while (iteratorOverPaths.hasNext() && !this.closed.get()) {
                    try {
                        String next = iteratorOverPaths.next();
                        long j5 = j + 1;
                        j = j5;
                        if (j5 % 1000000 == 0) {
                            LOG.info("Iterated {} type {} inlinedCount {} totalCount {} totalSize {} maxSize {} max {} availableThreads {} indexing {} prefetch {} path {}", Long.valueOf(j), prefetchType, Long.valueOf(j3), Long.valueOf(j2), Long.valueOf(j4), Long.valueOf(this.maxBlobSize), Long.valueOf(this.downloadMax.get()), Integer.valueOf(this.semaphore.availablePermits()), this.indexStore.toString(), this.prefetchStore.toString(), next);
                        }
                        if (prefetchType == PrefetchType.TRACK_INDEXING) {
                            this.iterateCount.set(j);
                            while (this.indexStore.getHighestReadKey().compareTo(next) < 0) {
                                sleep("wait for indexing to progress");
                            }
                        }
                        if (prefetchType == PrefetchType.NODESTORE_CACHE_FILLER) {
                            while (j - this.nodeReadAheadCount > this.iterateCount.get()) {
                                sleep("wait in node cache fillter");
                            }
                            this.indexStore.prefillCache(next, this.indexStore.buildNodeState(next, this.indexStore.getSession().get(next)));
                        } else if (next.endsWith(this.blobSuffix) && (property = this.prefetchStore.getNodeStateEntry(next).getNodeState().getProperty(JcrConstants.JCR_DATA)) != null && !property.isArray() && property.getType() == Type.BINARY) {
                            Blob blob = (Blob) property.getValue(Type.BINARY);
                            if (blob.isInlined()) {
                                j3++;
                            } else {
                                hyperLogLog.add(longHash(blob));
                                j2++;
                                j4 += blob.length();
                                this.maxBlobSize = Math.max(this.maxBlobSize, blob.length());
                                if (prefetchType == PrefetchType.TRACK_INDEXING) {
                                    this.downloadMax.set(j4);
                                } else {
                                    if (prefetchType != PrefetchType.BLOB_PREFETCH) {
                                        throw new IllegalStateException("Incorrect type: " + prefetchType);
                                    }
                                    if (this.indexStore.getHighestReadKey().compareTo(next) < 0) {
                                        while (j4 - this.blobReadAheadSize > this.downloadMax.get()) {
                                            sleep("wait in downloader");
                                        }
                                        this.semaphore.acquire();
                                        this.executorService.submit(() -> {
                                            try {
                                                LOG.debug("Prefetching {} took {} ms", next);
                                                InputStream newStream = blob.getNewStream();
                                                newStream.read();
                                                newStream.close();
                                            } catch (IOException e) {
                                                LOG.warn("Prefetching failed", next, e);
                                            }
                                            this.semaphore.release();
                                            atomicLong.incrementAndGet();
                                        });
                                    }
                                }
                            }
                        }
                    } catch (Exception e) {
                        LOG.warn("Prefetch error", (Throwable) e);
                        LOG.info("Completed after {} nodes, {} prefetched, {} unique", Long.valueOf(j), Long.valueOf(atomicLong.get()), Long.valueOf(hyperLogLog.estimate()));
                        return;
                    }
                }
                LOG.info("Completed after {} nodes, {} prefetched, {} unique", Long.valueOf(j), Long.valueOf(atomicLong.get()), Long.valueOf(hyperLogLog.estimate()));
            } catch (Throwable th) {
                LOG.info("Completed after {} nodes, {} prefetched, {} unique", Long.valueOf(j), Long.valueOf(atomicLong.get()), Long.valueOf(hyperLogLog.estimate()));
                throw th;
            }
        };
    }

    private static long longHash(Blob blob) {
        return Hash.hash64(blob.getContentIdentity().hashCode() | (blob.length() << 32));
    }
}
