package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.mongodb.BasicDBObject;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMergeSortTask;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedMongoDownloadTask;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedSortBatchTask;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedTransformTask;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStoreHelper;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.webdav.DavConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Protocol;

/* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.class */
public class PipelinedStrategy implements SortStrategy {
    public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE = "oak.indexer.pipelined.mongoDocQueueSize";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE = 100;
    public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE = "oak.indexer.pipelined.mongoDocBatchSize";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE = 500;
    public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = "oak.indexer.pipelined.transformThreads";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 2;
    public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = "oak.indexer.pipelined.workingMemoryMB";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 0;
    static final BasicDBObject[] SENTINEL_MONGO_DOCUMENT = new BasicDBObject[0];
    static final NodeStateEntryBatch SENTINEL_NSE_BUFFER = new NodeStateEntryBatch(ByteBuffer.allocate(0), 0);
    static final File SENTINEL_SORTED_FILES_QUEUE = new File("SENTINEL");
    static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PipelinedStrategy.class);
    private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 16;
    private static final int MIN_AUTODETECT_WORKING_MEMORY_MB = 128;
    private static final int MAX_AUTODETECT_WORKING_MEMORY_MB = 4000;
    private final MongoDocumentStore docStore;
    private final DocumentNodeStore documentNodeStore;
    private final RevisionVector rootRevision;
    private final BlobStore blobStore;
    private final File storeDir;
    private final PathElementComparator pathComparator;
    private final Compression algorithm;
    private long entryCount;
    private final Predicate<String> pathPredicate;

    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy$MonitorTask.class */
    private class MonitorTask implements Runnable {
        private final ArrayBlockingQueue<BasicDBObject[]> mongoDocQueue;
        private final ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue;
        private final ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue;
        private final ArrayBlockingQueue<File> sortedFilesQueue;
        private final TransformStageStatistics transformStageStatistics;

        public MonitorTask(ArrayBlockingQueue<BasicDBObject[]> arrayBlockingQueue, ArrayBlockingQueue<NodeStateEntryBatch> arrayBlockingQueue2, ArrayBlockingQueue<NodeStateEntryBatch> arrayBlockingQueue3, ArrayBlockingQueue<File> arrayBlockingQueue4, TransformStageStatistics transformStageStatistics) {
            this.mongoDocQueue = arrayBlockingQueue;
            this.emptyBatchesQueue = arrayBlockingQueue2;
            this.nonEmptyBatchesQueue = arrayBlockingQueue3;
            this.sortedFilesQueue = arrayBlockingQueue4;
            this.transformStageStatistics = transformStageStatistics;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                PipelinedStrategy.this.printStatistics(this.mongoDocQueue, this.emptyBatchesQueue, this.nonEmptyBatchesQueue, this.sortedFilesQueue, this.transformStageStatistics, false);
            } catch (Exception e) {
                PipelinedStrategy.LOG.error("Error while logging queue sizes", (Throwable) e);
            }
        }
    }

    private void printStatistics(ArrayBlockingQueue<BasicDBObject[]> arrayBlockingQueue, ArrayBlockingQueue<NodeStateEntryBatch> arrayBlockingQueue2, ArrayBlockingQueue<NodeStateEntryBatch> arrayBlockingQueue3, ArrayBlockingQueue<File> arrayBlockingQueue4, TransformStageStatistics transformStageStatistics, boolean z) {
        LOG.info("Queue sizes: {mongoDocQueue:{}, emptyBuffersQueue:{}, nonEmptyBuffersQueue:{}, sortedFilesQueue:{}}; Transform Stats: {}", Integer.valueOf(arrayBlockingQueue.size()), Integer.valueOf(arrayBlockingQueue2.size()), Integer.valueOf(arrayBlockingQueue3.size()), Integer.valueOf(arrayBlockingQueue4.size()), transformStageStatistics.formatStats());
        prettyPrintTransformStatisticsHistograms(transformStageStatistics, z);
    }

    private void prettyPrintTransformStatisticsHistograms(TransformStageStatistics transformStageStatistics, boolean z) {
        if (z) {
            LOG.info("Top hidden paths rejected: {}.", transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
            LOG.info("Top paths filtered: {}.", transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
            LOG.info("Top empty node state documents: {}", transformStageStatistics.getEmptyNodeStateHistogram().prettyPrint());
        } else {
            LOG.debug("Top hidden paths rejected: {}.", transformStageStatistics.getHiddenPathsRejectedHistogram().prettyPrint());
            LOG.debug("Top paths filtered: {}.", transformStageStatistics.getFilteredPathsRejectedHistogram().prettyPrint());
            LOG.debug("Top empty node state documents: {}", transformStageStatistics.getEmptyNodeStateHistogram().prettyPrint());
        }
    }

    public PipelinedStrategy(MongoDocumentStore mongoDocumentStore, DocumentNodeStore documentNodeStore, RevisionVector revisionVector, Set<String> set, BlobStore blobStore, File file, Compression compression, Predicate<String> predicate) {
        this.docStore = mongoDocumentStore;
        this.documentNodeStore = documentNodeStore;
        this.rootRevision = revisionVector;
        this.blobStore = blobStore;
        this.storeDir = file;
        this.pathComparator = new PathElementComparator(set);
        this.pathPredicate = predicate;
        this.algorithm = compression;
        Preconditions.checkState(mongoDocumentStore.isReadOnly(), "Traverser can only be used with readOnly store");
    }

    private int autodetectWorkingMemoryMB() {
        int maxMemory = (int) (Runtime.getRuntime().maxMemory() / 1048576);
        int i = maxMemory - 2048;
        LOG.info("Auto detecting working memory. Maximum heap size: {} MB, selected working memory: {} MB", Integer.valueOf(maxMemory), Integer.valueOf(i));
        if (i > MAX_AUTODETECT_WORKING_MEMORY_MB) {
            LOG.warn("Auto-detected value for working memory too high, setting to the maximum allowed for auto-detection: {} MB", Integer.valueOf(MAX_AUTODETECT_WORKING_MEMORY_MB));
            return MAX_AUTODETECT_WORKING_MEMORY_MB;
        }
        if (i >= 128) {
            return i;
        }
        LOG.warn("Auto-detected value for working memory too low, setting to the minimum allowed for auto-detection: {} MB", (Object) 128);
        return 128;
    }

    @Override // org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy
    public File createSortedStoreFile() throws IOException {
        int systemPropertyAsInt = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE, 100);
        Preconditions.checkArgument(systemPropertyAsInt > 0, "Invalid value for property oak.indexer.pipelined.mongoDocQueueSize: " + systemPropertyAsInt + ". Must be > 0");
        int systemPropertyAsInt2 = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE, 500);
        Preconditions.checkArgument(systemPropertyAsInt2 > 0, "Invalid value for property oak.indexer.pipelined.mongoDocBatchSize: " + systemPropertyAsInt2 + ". Must be > 0");
        int systemPropertyAsInt3 = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_TRANSFORM_THREADS, 2);
        Preconditions.checkArgument(systemPropertyAsInt3 > 0, "Invalid value for property oak.indexer.pipelined.transformThreads: " + systemPropertyAsInt3 + ". Must be > 0");
        int systemPropertyAsInt4 = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB, 0);
        Preconditions.checkArgument(systemPropertyAsInt4 >= 0, "Invalid value for property oak.indexer.pipelined.workingMemoryMB: " + systemPropertyAsInt4 + ". Must be >= 0");
        if (systemPropertyAsInt4 == 0) {
            systemPropertyAsInt4 = autodetectWorkingMemoryMB();
        }
        int i = 1 + systemPropertyAsInt3 + 1 + 1;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i, new ThreadFactoryBuilder().setNameFormat("mongo-dump").setDaemon(true).build());
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat(Protocol.SENTINEL_MONITOR).setDaemon(true).build());
        try {
            int i2 = 1 + systemPropertyAsInt3;
            int i3 = i2 + 1;
            int i4 = (((systemPropertyAsInt4 / i3) * 1024) * 4) / i2;
            int limitToIntegerRange = limitToIntegerRange((systemPropertyAsInt4 * 1048576) / i3);
            if (limitToIntegerRange < 16777216) {
                throw new IllegalArgumentException("Entry batch buffer size too small: " + limitToIntegerRange + " bytes. Must be at least 16 MB. To increase the size of the buffers, either increase the size of the working memory region (system propertyoak.indexer.pipelined.workingMemoryMB) or decrease the number of transform threads (oak.indexer.pipelined.transformThreads)");
            }
            LOG.info("Working memory: {} MB, number of buffers: {}, size of each buffer: {} MB, number of entries per buffer: {}", Integer.valueOf(systemPropertyAsInt4), Integer.valueOf(i2), Long.valueOf(limitToIntegerRange / 1048576), Integer.valueOf(i4));
            ArrayBlockingQueue<BasicDBObject[]> arrayBlockingQueue = new ArrayBlockingQueue<>(systemPropertyAsInt);
            ArrayBlockingQueue<NodeStateEntryBatch> arrayBlockingQueue2 = new ArrayBlockingQueue<>(i2);
            ArrayBlockingQueue<NodeStateEntryBatch> arrayBlockingQueue3 = new ArrayBlockingQueue<>(i2 + 1);
            ArrayBlockingQueue<File> arrayBlockingQueue4 = new ArrayBlockingQueue<>(64);
            TransformStageStatistics transformStageStatistics = new TransformStageStatistics();
            ScheduledFuture<?> scheduleWithFixedDelay = newScheduledThreadPool.scheduleWithFixedDelay(new MonitorTask(arrayBlockingQueue, arrayBlockingQueue2, arrayBlockingQueue3, arrayBlockingQueue4, transformStageStatistics), 10L, 30L, TimeUnit.SECONDS);
            for (int i5 = 0; i5 < i2; i5++) {
                arrayBlockingQueue2.add(NodeStateEntryBatch.createNodeStateEntryBatch(limitToIntegerRange, i4));
            }
            Stopwatch createStarted = Stopwatch.createStarted();
            executorCompletionService.submit(new PipelinedMongoDownloadTask(MongoDocumentStoreHelper.getDBCollection(this.docStore, Collection.NODES), systemPropertyAsInt2, arrayBlockingQueue));
            File file = null;
            for (int i6 = 0; i6 < systemPropertyAsInt3; i6++) {
                executorCompletionService.submit(new PipelinedTransformTask(this.docStore, this.documentNodeStore, Collection.NODES, this.rootRevision, this.pathPredicate, new NodeStateEntryWriter(this.blobStore), arrayBlockingQueue, arrayBlockingQueue2, arrayBlockingQueue3, transformStageStatistics));
            }
            executorCompletionService.submit(new PipelinedSortBatchTask(this.storeDir, this.pathComparator, this.algorithm, arrayBlockingQueue2, arrayBlockingQueue3, arrayBlockingQueue4));
            executorCompletionService.submit(new PipelinedMergeSortTask(this.storeDir, this.pathComparator, this.algorithm, arrayBlockingQueue4));
            try {
                try {
                    LOG.info("Waiting for tasks to complete.");
                    int i7 = 0;
                    int i8 = 0;
                    while (i7 < i) {
                        try {
                            Object obj = executorCompletionService.take().get();
                            if (obj instanceof PipelinedMongoDownloadTask.Result) {
                                LOG.info("Download task finished. Documents downloaded: {}", Long.valueOf(((PipelinedMongoDownloadTask.Result) obj).getDocumentsDownloaded()));
                                for (int i9 = 0; i9 < systemPropertyAsInt3; i9++) {
                                    arrayBlockingQueue.put(SENTINEL_MONGO_DOCUMENT);
                                }
                            } else if (obj instanceof PipelinedTransformTask.Result) {
                                PipelinedTransformTask.Result result = (PipelinedTransformTask.Result) obj;
                                i8++;
                                this.entryCount += result.getEntryCount();
                                LOG.info("Transform thread {} finished. Entries processed: {}", Integer.valueOf(result.getThreadId()), Long.valueOf(result.getEntryCount()));
                                if (i8 == systemPropertyAsInt3) {
                                    LOG.info("All transform tasks finished. Node states retrieved: {}", Long.valueOf(this.entryCount));
                                    scheduleWithFixedDelay.cancel(false);
                                    arrayBlockingQueue3.put(SENTINEL_NSE_BUFFER);
                                }
                            } else if (obj instanceof PipelinedSortBatchTask.Result) {
                                LOG.info("Sort task finished. Entries processed: {}", Long.valueOf(((PipelinedSortBatchTask.Result) obj).getTotalEntries()));
                                printStatistics(arrayBlockingQueue, arrayBlockingQueue2, arrayBlockingQueue3, arrayBlockingQueue4, transformStageStatistics, true);
                                arrayBlockingQueue4.put(SENTINEL_SORTED_FILES_QUEUE);
                            } else {
                                if (!(obj instanceof PipelinedMergeSortTask.Result)) {
                                    throw new RuntimeException("Unknown result type: " + obj);
                                }
                                PipelinedMergeSortTask.Result result2 = (PipelinedMergeSortTask.Result) obj;
                                File flatFileStoreFile = result2.getFlatFileStoreFile();
                                LOG.info("Sort task finished. FFS: {}, Size: {}", flatFileStoreFile, IOUtils.humanReadableByteCountBin(flatFileStoreFile.length()));
                                file = result2.getFlatFileStoreFile();
                            }
                            i7++;
                        } catch (ExecutionException e) {
                            LOG.warn("Execution error dumping from MongoDB: " + e + ". Shutting down all threads.");
                            newFixedThreadPool.shutdownNow();
                            if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                                LOG.warn("Thread pool failed to terminate");
                            }
                            throw new RuntimeException(e.getCause());
                        } catch (Throwable th) {
                            LOG.warn("Error dumping from MongoDB: " + th);
                            newFixedThreadPool.shutdownNow();
                            if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                                LOG.warn("Thread pool failed to terminate");
                            }
                            throw new RuntimeException(th);
                        }
                    }
                    LOG.info("Dumped {} nodestates in json format in {}", Long.valueOf(this.entryCount), createStarted);
                    printStatistics(arrayBlockingQueue, arrayBlockingQueue2, arrayBlockingQueue3, arrayBlockingQueue4, transformStageStatistics, true);
                    scheduleWithFixedDelay.cancel(true);
                    return file;
                } catch (Throwable th2) {
                    scheduleWithFixedDelay.cancel(true);
                    throw th2;
                }
            } catch (InterruptedException e2) {
                throw new RuntimeException(e2);
            }
        } finally {
            newFixedThreadPool.shutdown();
            newScheduledThreadPool.shutdown();
        }
    }

    private int limitToIntegerRange(long j) {
        if (j <= DavConstants.INFINITE_TIMEOUT) {
            return (int) j;
        }
        LOG.warn("Computed buffer size too big: {}, exceeds Integer.MAX_VALUE. Truncating to: {}", (Object) Long.valueOf(j), (Object) 2147483631);
        return 2147483631;
    }

    @Override // org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy
    public long getEntryCount() {
        return this.entryCount;
    }
}
