package org.apache.jackrabbit.oak.index.indexer.document.flatfile;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.io.LineIterator;
import org.apache.jackrabbit.JcrConstants;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
import org.apache.jackrabbit.oak.json.JsonDeserializer;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.plugins.blob.serializer.BlobIdSerializer;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader.class */
public class DefaultAheadOfTimeBlobDownloader implements AheadOfTimeBlobDownloader {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultAheadOfTimeBlobDownloader.class);
    private static final Blob SENTINEL = new BlobStoreBlob(null, null);
    private static final AtomicInteger threadNameCounter = new AtomicInteger(0);
    private static final int DOWNLOADED_BLOB_IDS_CACHE_SIZE = 1024;
    private final String binaryBlobsPathSuffix;
    private final File ffsPath;
    private final Compression algorithm;
    private final GarbageCollectableBlobStore blobStore;
    private final List<IndexDefinition> indexDefinitions;
    private ExecutorService executor;
    private ScanTask scanTask;
    private Future<?> scanFuture;
    private ArrayList<Future<?>> downloadFutures;
    private final int nDownloadThreads;
    private final AheadOfTimeBlobDownloaderThrottler throttler;
    private final LinkedHashMap<String, Boolean> downloadedBlobs = new LinkedHashMap<String, Boolean>(1024, 0.75f, true) { // from class: org.apache.jackrabbit.oak.index.indexer.document.flatfile.DefaultAheadOfTimeBlobDownloader.1
        private static final int MAX_ENTRIES = 716;

        @Override // java.util.LinkedHashMap
        protected boolean removeEldestEntry(Map.Entry<String, Boolean> entry) {
            return size() > 716;
        }
    };
    private final LongAdder totalBytesDownloaded = new LongAdder();
    private final LongAdder totalTimeDownloadingNanos = new LongAdder();
    private final LongAdder totalBlobsDownloaded = new LongAdder();
    private long blobsEnqueuedForDownload = 0;
    private long skippedLinesDueToLaggingIndexing = 0;
    private volatile long indexerLastKnownPosition = -1;

    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader$DownloadTask.class */
    private class DownloadTask implements Runnable {
        private final ArrayBlockingQueue<Blob> queue;
        private long blobsDownloaded = 0;
        private long bytesDownloaded = 0;
        private long timeDownloadingNanos = 0;

        public DownloadTask(ArrayBlockingQueue<Blob> arrayBlockingQueue) {
            this.queue = arrayBlockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName("downloader-" + DefaultAheadOfTimeBlobDownloader.threadNameCounter.getAndIncrement());
            byte[] bArr = new byte[4096];
            while (true) {
                try {
                    try {
                        Blob take = this.queue.take();
                        if (take == DefaultAheadOfTimeBlobDownloader.SENTINEL) {
                            DefaultAheadOfTimeBlobDownloader.LOG.info("Sentinel received, exiting. Statistics: {}", formatDownloaderStats());
                            this.queue.put(DefaultAheadOfTimeBlobDownloader.SENTINEL);
                            Thread.currentThread().setName(name);
                            return;
                        }
                        long nanoTime = System.nanoTime();
                        InputStream newStream = take.getNewStream();
                        int i = 0;
                        while (true) {
                            try {
                                int read = newStream.read(bArr);
                                if (read == -1) {
                                    break;
                                } else {
                                    i += read;
                                }
                            } catch (IOException e) {
                                DefaultAheadOfTimeBlobDownloader.LOG.error("Error downloading blob: {}", take.getContentIdentity(), e);
                            }
                        }
                        if (i != take.length()) {
                            DefaultAheadOfTimeBlobDownloader.LOG.error("Blob size mismatch: blob.length(): {}, bytesRead: {}", Long.valueOf(take.length()), Integer.valueOf(i));
                        }
                        long nanoTime2 = System.nanoTime() - nanoTime;
                        this.bytesDownloaded += i;
                        this.blobsDownloaded++;
                        this.timeDownloadingNanos += nanoTime2;
                        DefaultAheadOfTimeBlobDownloader.this.totalBytesDownloaded.add(i);
                        DefaultAheadOfTimeBlobDownloader.this.totalTimeDownloadingNanos.add(nanoTime2);
                        DefaultAheadOfTimeBlobDownloader.this.totalBlobsDownloaded.increment();
                        if (this.blobsDownloaded % 500 == 0) {
                            DefaultAheadOfTimeBlobDownloader.LOG.info("Retrieved blob: {}, size: {}, in {} ms. Downloader thread statistics: {}", take.getContentIdentity(), Long.valueOf(take.length()), Long.valueOf(nanoTime2 / 1000000), formatDownloaderStats());
                        }
                    } catch (InterruptedException e2) {
                        DefaultAheadOfTimeBlobDownloader.LOG.info("Download task interrupted, exiting. Statistics: {}", formatDownloaderStats());
                        Thread.currentThread().setName(name);
                        return;
                    }
                } catch (Throwable th) {
                    Thread.currentThread().setName(name);
                    throw th;
                }
            }
        }

        private String formatDownloaderStats() {
            return String.format("Downloaded %d blobs, %d bytes (%s) in %s", Long.valueOf(this.blobsDownloaded), Long.valueOf(this.bytesDownloaded), IOUtils.humanReadableByteCountBin(this.bytesDownloaded), FormattingUtils.formatNanosToSeconds(this.timeDownloadingNanos));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader$ScanTask.class */
    public class ScanTask implements Runnable {
        private final JsonDeserializer jsonDeserializer;
        private final ArrayBlockingQueue<Blob> queue;
        long linesScanned = 0;
        long blobCacheHit = 0;
        long notIncludedInIndex = 0;
        long doesNotMatchPattern = 0;
        long inlinedBlobsSkipped = 0;
        long skippedForOtherReasons = 0;

        public ScanTask(ArrayBlockingQueue<Blob> arrayBlockingQueue) {
            this.jsonDeserializer = new JsonDeserializer(new BlobIdSerializer(DefaultAheadOfTimeBlobDownloader.this.blobStore));
            this.queue = arrayBlockingQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                LineIterator lineIterator = new LineIterator(IndexStoreUtils.createReader(DefaultAheadOfTimeBlobDownloader.this.ffsPath, DefaultAheadOfTimeBlobDownloader.this.algorithm));
                try {
                    String name = Thread.currentThread().getName();
                    Thread.currentThread().setName("scanner");
                    DefaultAheadOfTimeBlobDownloader.LOG.info("Starting scanning FFS for blobs to download, matching suffix: {}", DefaultAheadOfTimeBlobDownloader.this.binaryBlobsPathSuffix);
                    while (lineIterator.hasNext()) {
                        try {
                            try {
                                String next = lineIterator.next();
                                int indexOf = next.indexOf(124);
                                String substring = next.substring(0, indexOf);
                                if (!isCandidatePath(substring)) {
                                    this.doesNotMatchPattern++;
                                } else if (DefaultAheadOfTimeBlobDownloader.this.indexDefinitions.stream().noneMatch(indexDefinition -> {
                                    return indexDefinition.shouldInclude(substring);
                                })) {
                                    this.notIncludedInIndex++;
                                } else if (isBehindIndexer(this.linesScanned)) {
                                    DefaultAheadOfTimeBlobDownloader.LOG.debug("Skipping blob at position {} because it was already indexed", Long.valueOf(this.linesScanned));
                                    DefaultAheadOfTimeBlobDownloader.this.skippedLinesDueToLaggingIndexing++;
                                } else {
                                    processEntry(substring, this.jsonDeserializer.deserialize(next, indexOf + 1));
                                }
                                this.linesScanned++;
                                if (this.linesScanned % 1000000 == 0) {
                                    DefaultAheadOfTimeBlobDownloader.LOG.info("[{}] Last path scanned: {}. Aggregated statistics: {}", Long.valueOf(this.linesScanned), substring, DefaultAheadOfTimeBlobDownloader.this.formatAggregateStatistics());
                                }
                            } catch (InterruptedException e) {
                                this.queue.clear();
                                DefaultAheadOfTimeBlobDownloader.LOG.info("Scan task interrupted, exiting");
                                DefaultAheadOfTimeBlobDownloader.LOG.info("Scanner reached end of FFS, stopping download threads. Statistics: {} {}", DefaultAheadOfTimeBlobDownloader.this.formatAggregateStatistics(), DefaultAheadOfTimeBlobDownloader.this.throttler.formatStats());
                                Thread.currentThread().setName(name);
                                this.queue.put(DefaultAheadOfTimeBlobDownloader.SENTINEL);
                            }
                        } catch (Throwable th) {
                            DefaultAheadOfTimeBlobDownloader.LOG.info("Scanner reached end of FFS, stopping download threads. Statistics: {} {}", DefaultAheadOfTimeBlobDownloader.this.formatAggregateStatistics(), DefaultAheadOfTimeBlobDownloader.this.throttler.formatStats());
                            Thread.currentThread().setName(name);
                            this.queue.put(DefaultAheadOfTimeBlobDownloader.SENTINEL);
                            throw th;
                        }
                    }
                    DefaultAheadOfTimeBlobDownloader.LOG.info("Scanner reached end of FFS, stopping download threads. Statistics: {} {}", DefaultAheadOfTimeBlobDownloader.this.formatAggregateStatistics(), DefaultAheadOfTimeBlobDownloader.this.throttler.formatStats());
                    Thread.currentThread().setName(name);
                    this.queue.put(DefaultAheadOfTimeBlobDownloader.SENTINEL);
                    lineIterator.close();
                } finally {
                }
            } catch (IOException | InterruptedException e2) {
                throw new RuntimeException(e2);
            }
        }

        private boolean isCandidatePath(String str) {
            return str.endsWith(DefaultAheadOfTimeBlobDownloader.this.binaryBlobsPathSuffix);
        }

        private boolean isBehindIndexer(long j) {
            return j <= DefaultAheadOfTimeBlobDownloader.this.indexerLastKnownPosition;
        }

        private void processEntry(String str, NodeState nodeState) throws InterruptedException {
            PropertyState property = nodeState.getProperty(JcrConstants.JCR_DATA);
            if (property == null || property.isArray() || property.getType() != Type.BINARY) {
                this.skippedForOtherReasons++;
                DefaultAheadOfTimeBlobDownloader.LOG.info("Skipping node: {}. Property \"jcr:data\": {}", str, property);
                return;
            }
            for (Blob blob : (Iterable) property.getValue(Type.BINARIES)) {
                if (blob.isInlined()) {
                    this.inlinedBlobsSkipped++;
                } else if (blob.getContentIdentity() == null) {
                    DefaultAheadOfTimeBlobDownloader.LOG.info("[{}] Skipping blob with null content identity: {}", Long.valueOf(this.linesScanned), blob.getContentIdentity());
                } else if (DefaultAheadOfTimeBlobDownloader.this.downloadedBlobs.containsKey(blob.getContentIdentity())) {
                    this.blobCacheHit++;
                    DefaultAheadOfTimeBlobDownloader.LOG.debug("[{}] Blob already downloaded or enqueued for download: {}", Long.valueOf(this.linesScanned), blob.getContentIdentity());
                } else if (DefaultAheadOfTimeBlobDownloader.this.throttler.reserveSpaceForBlob(this.linesScanned, blob.length())) {
                    DefaultAheadOfTimeBlobDownloader.this.downloadedBlobs.put(blob.getContentIdentity(), Boolean.TRUE);
                    this.queue.put(blob);
                    DefaultAheadOfTimeBlobDownloader.this.blobsEnqueuedForDownload++;
                    if (DefaultAheadOfTimeBlobDownloader.this.blobsEnqueuedForDownload % 1000 == 0) {
                        DefaultAheadOfTimeBlobDownloader.LOG.info("[{}] Enqueued blob for download: {}, size: {}, Statistics: {}, {}", Long.valueOf(this.linesScanned), blob.getContentIdentity(), Long.valueOf(blob.length()), DefaultAheadOfTimeBlobDownloader.this.formatAggregateStatistics(), DefaultAheadOfTimeBlobDownloader.this.throttler.formatStats());
                    }
                } else {
                    DefaultAheadOfTimeBlobDownloader.this.skippedLinesDueToLaggingIndexing++;
                }
            }
        }
    }

    public DefaultAheadOfTimeBlobDownloader(@NotNull String str, @NotNull File file, @NotNull Compression compression, @NotNull GarbageCollectableBlobStore garbageCollectableBlobStore, @NotNull List<IndexDefinition> list, int i, int i2, int i3) {
        if (i < 1) {
            throw new IllegalArgumentException("nDownloadThreads must be greater than 0. Was: " + i);
        }
        if (i3 < 1) {
            throw new IllegalArgumentException("maxPrefetchWindowMB must be greater than 0. Was: " + i3);
        }
        this.binaryBlobsPathSuffix = str;
        this.ffsPath = file;
        this.algorithm = compression;
        this.blobStore = garbageCollectableBlobStore;
        this.indexDefinitions = list;
        this.nDownloadThreads = i;
        this.throttler = new AheadOfTimeBlobDownloaderThrottler(i2, i3 * 1048576);
        LOG.info("Created AheadOfTimeBlobDownloader. downloadThreads: {}, prefetchMB: {}, enabledIndexes: {}", Integer.valueOf(i), Integer.valueOf(i3), list);
    }

    @Override // org.apache.jackrabbit.oak.index.indexer.document.flatfile.AheadOfTimeBlobDownloader
    public void start() {
        this.executor = Executors.newFixedThreadPool(this.nDownloadThreads + 1);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(this.nDownloadThreads * 2);
        this.downloadFutures = new ArrayList<>();
        for (int i = 0; i < this.nDownloadThreads; i++) {
            this.downloadFutures.add(this.executor.submit(new DownloadTask(arrayBlockingQueue)));
        }
        this.scanTask = new ScanTask(arrayBlockingQueue);
        this.scanFuture = this.executor.submit(this.scanTask);
    }

    public void join() throws ExecutionException, InterruptedException {
        this.scanFuture.get();
        Iterator<Future<?>> it = this.downloadFutures.iterator();
        while (it.hasNext()) {
            it.next().get();
        }
    }

    @Override // org.apache.jackrabbit.oak.index.indexer.document.flatfile.AheadOfTimeBlobDownloader
    public void updateIndexed(long j) {
        this.indexerLastKnownPosition = j;
        this.throttler.advanceIndexer(j);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        stop();
    }

    public void stop() {
        if (this.executor == null) {
            return;
        }
        LOG.info("Stopping AheadOfTimeBlobDownloader. Statistics: {}", formatAggregateStatistics());
        this.scanFuture.cancel(true);
        Iterator<Future<?>> it = this.downloadFutures.iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
        LOG.info("Waiting for download tasks to finish");
        new ExecutorCloser(this.executor).close();
        this.executor = null;
        LOG.info("All download tasks finished");
    }

    public String formatAggregateStatistics() {
        long sum = this.totalBytesDownloaded.sum();
        return String.format("Downloaded %d blobs, %d bytes (%s). aggregatedDownloadTime: %s, cacheHits: %d, linesScanned: %d, notIncludedInIndex: %d, doesNotMatchPattern: %d, inlinedBlobsSkipped: %d, skippedForOtherReasons: %d, skippedLinesDueToLaggingIndexing: %d", Long.valueOf(this.totalBlobsDownloaded.sum()), Long.valueOf(sum), IOUtils.humanReadableByteCountBin(sum), FormattingUtils.formatNanosToSeconds(this.totalTimeDownloadingNanos.sum()), Long.valueOf(this.scanTask.blobCacheHit), Long.valueOf(this.scanTask.linesScanned), Long.valueOf(this.scanTask.notIncludedInIndex), Long.valueOf(this.scanTask.doesNotMatchPattern), Long.valueOf(this.scanTask.inlinedBlobsSkipped), Long.valueOf(this.scanTask.skippedForOtherReasons), Long.valueOf(this.skippedLinesDueToLaggingIndexing));
    }

    public long getBlobsEnqueuedForDownload() {
        return this.blobsEnqueuedForDownload;
    }

    public long getTotalBlobsDownloaded() {
        return this.totalBlobsDownloaded.sum();
    }

    public long getLinesScanned() {
        return this.scanTask.linesScanned;
    }

    public long getNotIncludedInIndex() {
        return this.scanTask.notIncludedInIndex;
    }
}
