package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Locale;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.NodeStateEntryBatch;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeState;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreHelper;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStoreHelper;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.bson.BsonBinaryReader;
import org.bson.BsonReader;
import org.bson.ByteBuf;
import org.bson.RawBsonDocument;
import org.bson.codecs.DecoderContext;
import org.bson.io.ByteBufferBsonInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.class */
class PipelinedTransformTask implements Callable<Result> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PipelinedTransformTask.class);
    private static final AtomicInteger threadIdGenerator = new AtomicInteger();
    private static final String THREAD_NAME_PREFIX = "mongo-transform-";
    private final MongoDocumentStore mongoStore;
    private final DocumentNodeStore documentNodeStore;
    private final NodeDocumentCodec nodeDocumentCodec;
    private final RevisionVector rootRevision;
    private final NodeStateEntryWriter entryWriter;
    private final Predicate<String> pathPredicate;
    private final ArrayBlockingQueue<RawBsonDocument[]> mongoDocQueue;
    private final ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue;
    private final ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue;
    private final TransformStageStatistics statistics;
    private final int threadId = threadIdGenerator.getAndIncrement();
    private long totalEnqueueDelayMillis = 0;
    private long totalEmptyBatchQueueWaitTimeMillis = 0;

    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask$Result.class */
    public static class Result {
        private final int transformThreadId;
        private final long entryCount;

        public Result(int i, long j) {
            this.transformThreadId = i;
            this.entryCount = j;
        }

        public long getEntryCount() {
            return this.entryCount;
        }

        public int getThreadId() {
            return this.transformThreadId;
        }
    }

    public PipelinedTransformTask(MongoDocumentStore mongoDocumentStore, DocumentNodeStore documentNodeStore, NodeDocumentCodec nodeDocumentCodec, RevisionVector revisionVector, Predicate<String> predicate, NodeStateEntryWriter nodeStateEntryWriter, ArrayBlockingQueue<RawBsonDocument[]> arrayBlockingQueue, ArrayBlockingQueue<NodeStateEntryBatch> arrayBlockingQueue2, ArrayBlockingQueue<NodeStateEntryBatch> arrayBlockingQueue3, TransformStageStatistics transformStageStatistics) {
        this.mongoStore = mongoDocumentStore;
        this.documentNodeStore = documentNodeStore;
        this.nodeDocumentCodec = nodeDocumentCodec;
        this.rootRevision = revisionVector;
        this.pathPredicate = predicate;
        this.entryWriter = nodeStateEntryWriter;
        this.mongoDocQueue = arrayBlockingQueue;
        this.emptyBatchesQueue = arrayBlockingQueue2;
        this.nonEmptyBatchesQueue = arrayBlockingQueue3;
        this.statistics = transformStageStatistics;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    /* JADX WARN: Finally extract failed */
    @Override // java.util.concurrent.Callable
    public Result call() throws Exception {
        int addEntry;
        String name = Thread.currentThread().getName();
        String str = "mongo-transform-" + this.threadId;
        Thread.currentThread().setName(str);
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            try {
                IndexUtils.INDEXING_PHASE_LOGGER.info("[TASK:{}:START] Starting transform task", str.toUpperCase(Locale.ROOT));
                NodeDocumentCache nodeDocumentCache = MongoDocumentStoreHelper.getNodeDocumentCache(this.mongoStore);
                long j = 0;
                long j2 = 0;
                int i = 0;
                long j3 = 0;
                LOG.debug("Waiting for an empty buffer");
                NodeStateEntryBatch take = this.emptyBatchesQueue.take();
                LOG.debug("Obtained an empty buffer. Starting to convert Mongo documents to node state entries");
                ArrayList<DocumentNodeState> arrayList = new ArrayList<>();
                Stopwatch createUnstarted = Stopwatch.createUnstarted();
                while (true) {
                    createUnstarted.reset().start();
                    RawBsonDocument[] take2 = this.mongoDocQueue.take();
                    j += createUnstarted.elapsed(TimeUnit.MILLISECONDS);
                    if (take2 == PipelinedMongoDownloadTask.SENTINEL_MONGO_DOCUMENT) {
                        this.mongoDocQueue.put(PipelinedMongoDownloadTask.SENTINEL_MONGO_DOCUMENT);
                        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
                        IndexUtils.INDEXING_PHASE_LOGGER.info("[TASK:{}:END] Metrics: {}", str.toUpperCase(Locale.ROOT), MetricsFormatter.newBuilder().add("duration", FormattingUtils.formatToSeconds(createStarted)).add("durationSeconds", elapsed / 1000).add("nodeStateEntriesGenerated", j2).add("enqueueDelayMillis", this.totalEnqueueDelayMillis).add("enqueueDelayPercentage", PipelinedUtils.formatAsPercentage(this.totalEnqueueDelayMillis, elapsed)).add("documentQueueWaitMillis", j).add("documentQueueWaitPercentage", PipelinedUtils.formatAsPercentage(j, elapsed)).add("totalEmptyBatchQueueWaitTimeMillis", this.totalEmptyBatchQueueWaitTimeMillis).add("totalEmptyBatchQueueWaitPercentage", PipelinedUtils.formatAsPercentage(this.totalEmptyBatchQueueWaitTimeMillis, elapsed)).build());
                        take.getBuffer().flip();
                        tryEnqueue(take);
                        Result result = new Result(this.threadId, j2);
                        Thread.currentThread().setName(name);
                        return result;
                    }
                    for (RawBsonDocument rawBsonDocument : take2) {
                        this.statistics.incrementMongoDocumentsTraversed();
                        i++;
                        j3++;
                        ByteBuf byteBuffer = rawBsonDocument.getByteBuffer();
                        int remaining = byteBuffer.remaining() * 2;
                        BsonBinaryReader bsonBinaryReader = new BsonBinaryReader(new ByteBufferBsonInput(byteBuffer));
                        try {
                            NodeDocument decode = this.nodeDocumentCodec.decode((BsonReader) bsonBinaryReader, DecoderContext.builder().build());
                            bsonBinaryReader.close();
                            if (decode != NodeDocument.NULL) {
                                if (i >= 100000) {
                                    i = 0;
                                    LOG.info("Processing: {}, {}. Total documents: {}, total entries: {}, current batch: {}, Size: {}/{} MB", decode.getModified(), decode.getId(), Long.valueOf(j3), Long.valueOf(j2), Integer.valueOf(take.numberOfEntries()), Long.valueOf(take.sizeOfEntriesBytes() / 1048576), Long.valueOf(take.capacity() / 1048576));
                                }
                                decode.put(NodeDocumentCodec.SIZE_FIELD, Integer.valueOf(remaining));
                                nodeDocumentCache.put(decode);
                                if (decode.isSplitDocument()) {
                                    this.statistics.addSplitDocument(decode.getId());
                                } else {
                                    arrayList.clear();
                                    extractNodeStateEntries(decode, arrayList);
                                    if (arrayList.isEmpty()) {
                                        this.statistics.addEmptyNodeStateEntry(decode.getId());
                                    } else {
                                        Iterator<DocumentNodeState> it = arrayList.iterator();
                                        while (it.hasNext()) {
                                            DocumentNodeState next = it.next();
                                            String path = next.getPath().toString();
                                            if (NodeStateUtils.isHiddenPath(path) || !this.pathPredicate.test(path)) {
                                                this.statistics.incrementEntriesRejected();
                                                if (NodeStateUtils.isHiddenPath(path)) {
                                                    this.statistics.addRejectedHiddenPath(path);
                                                }
                                                if (!this.pathPredicate.test(path)) {
                                                    this.statistics.addRejectedFilteredPath(path);
                                                }
                                            } else {
                                                this.statistics.incrementEntriesAccepted();
                                                j2++;
                                                byte[] bytes = this.entryWriter.asJson(next).getBytes(StandardCharsets.UTF_8);
                                                try {
                                                    addEntry = take.addEntry(path, bytes);
                                                } catch (NodeStateEntryBatch.BufferFullException e) {
                                                    LOG.info("Buffer full, passing buffer to sort task. Total entries: {}, entries in buffer {}, buffer size: {}", Long.valueOf(j2), Integer.valueOf(take.numberOfEntries()), IOUtils.humanReadableByteCountBin(take.sizeOfEntriesBytes()));
                                                    take.flip();
                                                    tryEnqueue(take);
                                                    Stopwatch createStarted2 = Stopwatch.createStarted();
                                                    take = this.emptyBatchesQueue.take();
                                                    this.totalEmptyBatchQueueWaitTimeMillis += createStarted2.elapsed(TimeUnit.MILLISECONDS);
                                                    addEntry = take.addEntry(path, bytes);
                                                }
                                                this.statistics.incrementTotalExtractedEntriesSize(addEntry);
                                            }
                                        }
                                    }
                                }
                            }
                        } catch (Throwable th) {
                            try {
                                bsonBinaryReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                        }
                    }
                }
            } catch (Throwable th3) {
                IndexUtils.INDEXING_PHASE_LOGGER.info("[TASK:{}:FAIL] Metrics: {}, Error: {}", str.toUpperCase(Locale.ROOT), MetricsFormatter.createMetricsWithDurationOnly(createStarted), th3.toString());
                LOG.warn("Thread terminating with exception", th3);
                throw th3;
            }
        } catch (Throwable th4) {
            Thread.currentThread().setName(name);
            throw th4;
        }
    }

    private void tryEnqueue(NodeStateEntryBatch nodeStateEntryBatch) throws InterruptedException {
        Stopwatch createStarted = Stopwatch.createStarted();
        this.nonEmptyBatchesQueue.put(nodeStateEntryBatch);
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.totalEnqueueDelayMillis += elapsed;
        if (elapsed > 1) {
            LOG.info("Enqueuing of node state entries batch was delayed, took {} ms. nonEmptyBatchesQueue size {}. ", Long.valueOf(elapsed), Integer.valueOf(this.nonEmptyBatchesQueue.size()));
        }
    }

    private void extractNodeStateEntries(NodeDocument nodeDocument, ArrayList<DocumentNodeState> arrayList) {
        DocumentNodeState readNode = DocumentNodeStoreHelper.readNode(this.documentNodeStore, nodeDocument.getPath(), this.rootRevision);
        if (readNode == null || !readNode.exists()) {
            return;
        }
        arrayList.add(readNode);
        Iterator<DocumentNodeState> it = readNode.getAllBundledNodesStates().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
        }
    }
}
