package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoException;
import com.mongodb.MongoIncompatibleDriverException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.ReadPreference;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.Collection;
import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.ReplicaSetStatus;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.bson.BsonDocument;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.class */
public class PipelinedMongoDownloadTask implements Callable<Result> {
    public static final String OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS = "oak.indexer.pipelined.retryOnConnectionErrors";
    public static final boolean DEFAULT_OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS = true;
    public static final String OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = "oak.indexer.pipelined.mongoConnectionRetrySeconds";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = 300;
    public static final String OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING = "oak.indexer.pipelined.mongoRegexPathFiltering";
    public static final boolean DEFAULT_OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING = false;
    private static final long retryInitialIntervalMillis = 100;
    private static final long retryMaxIntervalMillis = 10000;
    private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES = 10;
    private static final String THREAD_NAME = "mongo-dump";
    private final int maxBatchNumberOfDocuments;
    private final BlockingQueue<NodeDocument[]> mongoDocQueue;
    private final List<PathFilter> pathFilters;
    private final boolean retryOnConnectionErrors;
    private final boolean regexPathFiltering;
    private final MongoCollection<NodeDocument> dbCollection;
    private final ReadPreference readPreference;
    private final int maxBatchSizeBytes;
    private final StatisticsProvider statisticsProvider;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PipelinedMongoDownloadTask.class);
    private static final Duration MONGO_QUEUE_OFFER_TIMEOUT = Duration.ofMinutes(30);
    private static final BsonDocument NATURAL_HINT = BsonDocument.parse("{ $natural: 1 }");
    private static final BsonDocument ID_INDEX_HINT = BsonDocument.parse("{ _id: 1 }");
    private final Logger traversalLog = LoggerFactory.getLogger(PipelinedMongoDownloadTask.class.getName() + ".traversal");
    private final Stopwatch downloadStartWatch = Stopwatch.createUnstarted();
    private long totalEnqueueWaitTimeMillis = 0;
    private Instant lastDelayedEnqueueWarningMessageLoggedTimestamp = Instant.now();
    private long documentsRead = 0;
    private long nextLastModified = 0;
    private String lastIdDownloaded = null;
    private final int retryDuringSeconds = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS, 300);

    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask$Result.class */
    public static class Result {
        private final long documentsDownloaded;

        public Result(long j) {
            this.documentsDownloaded = j;
        }

        public long getDocumentsDownloaded() {
            return this.documentsDownloaded;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask$RetryException.class */
    public static class RetryException extends RuntimeException {
        private final int retrialDurationSeconds;

        public RetryException(int i, String str, Throwable th) {
            super(str, th);
            this.retrialDurationSeconds = i;
        }

        @Override // java.lang.Throwable
        public String toString() {
            return "Tried for " + this.retrialDurationSeconds + " seconds: \n" + super.toString();
        }
    }

    public PipelinedMongoDownloadTask(MongoDatabase mongoDatabase, MongoDocumentStore mongoDocumentStore, int i, int i2, BlockingQueue<NodeDocument[]> blockingQueue, List<PathFilter> list, StatisticsProvider statisticsProvider) {
        this.statisticsProvider = statisticsProvider;
        this.dbCollection = mongoDatabase.withCodecRegistry(CodecRegistries.fromRegistries(CodecRegistries.fromProviders(new NodeDocumentCodecProvider(mongoDocumentStore, Collection.NODES)), MongoClientSettings.getDefaultCodecRegistry())).getCollection(Collection.NODES.toString(), NodeDocument.class);
        this.maxBatchSizeBytes = i;
        this.maxBatchNumberOfDocuments = i2;
        this.mongoDocQueue = blockingQueue;
        this.pathFilters = list;
        Preconditions.checkArgument(this.retryDuringSeconds > 0, "Property oak.indexer.pipelined.mongoConnectionRetrySeconds must be > 0. Was: " + this.retryDuringSeconds);
        this.retryOnConnectionErrors = ConfigHelper.getSystemPropertyAsBoolean(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, true);
        this.regexPathFiltering = ConfigHelper.getSystemPropertyAsBoolean(OAK_INDEXER_PIPELINED_MONGO_REGEX_PATH_FILTERING, false);
        this.readPreference = ReadPreference.secondaryPreferred();
        LOG.info("maxBatchSizeBytes: {}, maxBatchNumberOfDocuments: {}, readPreference: {}", Integer.valueOf(i), Integer.valueOf(i2), this.readPreference.getName());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    /* JADX WARN: Finally extract failed */
    @Override // java.util.concurrent.Callable
    public Result call() throws Exception {
        String name = Thread.currentThread().getName();
        Thread.currentThread().setName(THREAD_NAME);
        LOG.info("[TASK:{}:START] Starting to download from MongoDB", THREAD_NAME.toUpperCase(Locale.ROOT));
        try {
            try {
                this.nextLastModified = 0L;
                this.lastIdDownloaded = null;
                this.downloadStartWatch.start();
                if (this.retryOnConnectionErrors) {
                    downloadWithRetryOnConnectionErrors();
                } else {
                    downloadWithNaturalOrdering();
                }
                long elapsed = this.downloadStartWatch.elapsed(TimeUnit.MILLISECONDS);
                String build = MetricsFormatter.newBuilder().add("duration", FormattingUtils.formatToSeconds(this.downloadStartWatch)).add("durationSeconds", elapsed / 1000).add("documentsDownloaded", this.documentsRead).add("enqueueingDelayMillis", this.totalEnqueueWaitTimeMillis).add("enqueueingDelayPercentage", PipelinedUtils.formatAsPercentage(this.totalEnqueueWaitTimeMillis, elapsed)).build();
                MetricsUtils.setCounterOnce(this.statisticsProvider, PipelinedMetrics.OAK_INDEXER_PIPELINED_MONGO_DOWNLOAD_ENQUEUE_DELAY_PERCENTAGE, PipelinedUtils.toPercentage(this.totalEnqueueWaitTimeMillis, elapsed));
                LOG.info("[TASK:{}:END] Metrics: {}", THREAD_NAME.toUpperCase(Locale.ROOT), build);
                Result result = new Result(this.documentsRead);
                Thread.currentThread().setName(name);
                return result;
            } catch (InterruptedException e) {
                LOG.warn("Thread interrupted", (Throwable) e);
                throw e;
            } catch (Throwable th) {
                LOG.warn("Thread terminating with exception.", th);
                throw th;
            }
        } catch (Throwable th2) {
            Thread.currentThread().setName(name);
            throw th2;
        }
    }

    private void reportProgress(String str) {
        if (this.documentsRead % 10000 == 0) {
            double elapsed = this.documentsRead / this.downloadStartWatch.elapsed(TimeUnit.SECONDS);
            LOG.info("Dumping from NSET Traversed #{} {} [{}] (Elapsed {})", Long.valueOf(this.documentsRead), str, String.format(Locale.ROOT, "%1.2f nodes/s, %1.2f nodes/hr", Double.valueOf(elapsed), Double.valueOf(elapsed * 3600.0d)), FormattingUtils.formatToSeconds(this.downloadStartWatch));
        }
        this.traversalLog.trace(str);
    }

    private void downloadWithRetryOnConnectionErrors() throws InterruptedException, TimeoutException {
        Bson descendantsFilter;
        String pathForRegexFiltering = getPathForRegexFiltering();
        if (pathForRegexFiltering == null) {
            descendantsFilter = null;
        } else {
            downloadAncestors(pathForRegexFiltering);
            descendantsFilter = descendantsFilter(pathForRegexFiltering);
        }
        Instant instant = null;
        long j = 100;
        int i = 0;
        boolean z = false;
        HashMap hashMap = new HashMap();
        this.nextLastModified = 0L;
        this.lastIdDownloaded = null;
        while (!z) {
            try {
                if (this.lastIdDownloaded != null) {
                    LOG.info("Recovering from broken connection, finishing downloading documents with _modified={}", Long.valueOf(this.nextLastModified));
                    downloadRange(new DownloadRange(this.nextLastModified, this.nextLastModified + 1, this.lastIdDownloaded), descendantsFilter);
                    instant = null;
                    i = 0;
                    downloadRange(new DownloadRange(this.nextLastModified + 1, ReplicaSetStatus.UNKNOWN_LAG, null), descendantsFilter);
                } else {
                    downloadRange(new DownloadRange(this.nextLastModified, ReplicaSetStatus.UNKNOWN_LAG, null), descendantsFilter);
                }
                z = true;
            } catch (MongoException e) {
                if ((e instanceof MongoInterruptedException) || (e instanceof MongoIncompatibleDriverException)) {
                    throw e;
                }
                if (instant == null) {
                    instant = Instant.now().truncatedTo(ChronoUnit.SECONDS);
                }
                LOG.warn("Connection error downloading from MongoDB.", (Throwable) e);
                long seconds = Duration.between(instant, Instant.now()).toSeconds();
                if (seconds > this.retryDuringSeconds) {
                    StringBuilder sb = new StringBuilder();
                    for (Map.Entry entry : hashMap.entrySet()) {
                        sb.append("\n\t").append(entry.getValue()).append("x: ").append((String) entry.getKey());
                    }
                    throw new RetryException(this.retryDuringSeconds, sb.toString(), e);
                }
                i++;
                LOG.warn("Retrying download in {} ms; number of times failed: {}; current series of failures started at: {} ({} seconds ago)", Long.valueOf(j), Integer.valueOf(i), instant, Long.valueOf(seconds));
                hashMap.compute(e.getClass().getSimpleName() + " - " + e.getMessage(), (str, num) -> {
                    return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
                });
                Thread.sleep(j);
                j = Math.min(10000L, j * 2);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [org.bson.conversions.Bson] */
    private void downloadRange(DownloadRange downloadRange, Bson bson) throws InterruptedException, TimeoutException {
        BsonDocument findQuery = downloadRange.getFindQuery();
        if (bson != null) {
            findQuery = Filters.and(findQuery, bson);
        }
        LOG.info("Traversing: {}. Query: {}", downloadRange, findQuery);
        download(this.dbCollection.withReadPreference(this.readPreference).find(findQuery).sort(Sorts.ascending("_modified", "_id")));
    }

    private void downloadAncestors(String str) throws InterruptedException, TimeoutException {
        Bson ancestorsFilter = ancestorsFilter(str);
        LOG.info("Downloading using regex path filtering. Base path: {}, Ancestors query: {}.", str, ancestorsFilter);
        download(this.dbCollection.withReadPreference(this.readPreference).find(ancestorsFilter).hint(ID_INDEX_HINT));
    }

    private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutException {
        String pathForRegexFiltering = getPathForRegexFiltering();
        if (pathForRegexFiltering == null) {
            LOG.info("Downloading full repository using natural order");
            download(this.dbCollection.withReadPreference(this.readPreference).find().hint(NATURAL_HINT));
        } else {
            downloadAncestors(pathForRegexFiltering);
            Bson descendantsFilter = descendantsFilter(pathForRegexFiltering);
            LOG.info("Downloading using regex path filtering. Downloading children: {}.", descendantsFilter);
            download(this.dbCollection.withReadPreference(this.readPreference).find(descendantsFilter).hint(NATURAL_HINT));
        }
    }

    private String getPathForRegexFiltering() {
        if (this.regexPathFiltering) {
            return getSingleIncludedPath(this.pathFilters);
        }
        LOG.info("Regex path filtering disabled.");
        return null;
    }

    static String getSingleIncludedPath(List<PathFilter> list) {
        LOG.info("Creating regex filter from pathFilters: " + list);
        if (list == null) {
            return null;
        }
        Set set = (Set) list.stream().flatMap(pathFilter -> {
            return pathFilter.getIncludedPaths().stream();
        }).collect(Collectors.toSet());
        if (((Set) list.stream().flatMap(pathFilter2 -> {
            return pathFilter2.getExcludedPaths().stream();
        }).collect(Collectors.toSet())).isEmpty() && set.size() == 1) {
            return (String) set.stream().iterator().next();
        }
        return null;
    }

    private static Bson descendantsFilter(String str) {
        if (!str.endsWith("/")) {
            str = str + "/";
        }
        String quote = Pattern.quote(str);
        return Filters.or(Filters.regex("_id", Pattern.compile("^[0-9]{1,3}:" + quote + ".*$")), Filters.regex(NodeDocument.PATH, Pattern.compile(quote + ".*$")));
    }

    private static Bson ancestorsFilter(String str) {
        ArrayList arrayList = new ArrayList();
        String str2 = str;
        while (true) {
            String str3 = str2;
            arrayList.add(Filters.eq("_id", Utils.getIdFromPath(str3)));
            if (PathUtils.denotesRoot(str3)) {
                return Filters.or(arrayList);
            }
            str2 = PathUtils.getParentPath(str3);
        }
    }

    private void download(FindIterable<NodeDocument> findIterable) throws InterruptedException, TimeoutException {
        MongoCursor<NodeDocument> it = findIterable.iterator();
        try {
            NodeDocument[] nodeDocumentArr = new NodeDocument[this.maxBatchNumberOfDocuments];
            int i = 0;
            int i2 = 0;
            while (it.hasNext()) {
                try {
                    NodeDocument next = it.next();
                    String id = next.getId();
                    if (this.retryOnConnectionErrors) {
                        this.nextLastModified = next.getModified().longValue();
                    }
                    this.lastIdDownloaded = id;
                    this.documentsRead++;
                    reportProgress(id);
                    nodeDocumentArr[i] = next;
                    i++;
                    i2 += ((Integer) next.remove(NodeDocumentCodec.SIZE_FIELD)).intValue();
                    if (i2 >= this.maxBatchSizeBytes || i == nodeDocumentArr.length) {
                        LOG.trace("Enqueuing block with {} elements, estimated size: {} bytes", Integer.valueOf(i), Integer.valueOf(i2));
                        tryEnqueueCopy(nodeDocumentArr, i);
                        i = 0;
                        i2 = 0;
                    }
                } catch (MongoException e) {
                    if ((e instanceof MongoInterruptedException) || (e instanceof MongoIncompatibleDriverException)) {
                        throw e;
                    }
                    if (i > 0) {
                        LOG.info("Connection interrupted with recoverable failure. Enqueueing partial block with {} elements, estimated size: {}", Integer.valueOf(i), IOUtils.humanReadableByteCountBin(i2));
                        tryEnqueueCopy(nodeDocumentArr, i);
                    }
                    throw e;
                }
            }
            if (i > 0) {
                LOG.info("Enqueueing last block with {} elements, estimated size: {}", Integer.valueOf(i), IOUtils.humanReadableByteCountBin(i2));
                tryEnqueueCopy(nodeDocumentArr, i);
            }
            if (it != null) {
                it.close();
            }
        } catch (Throwable th) {
            if (it != null) {
                try {
                    it.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void tryEnqueueCopy(NodeDocument[] nodeDocumentArr, int i) throws TimeoutException, InterruptedException {
        NodeDocument[] nodeDocumentArr2 = (NodeDocument[]) Arrays.copyOfRange(nodeDocumentArr, 0, i);
        Stopwatch createStarted = Stopwatch.createStarted();
        if (!this.mongoDocQueue.offer(nodeDocumentArr2, MONGO_QUEUE_OFFER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException("Timeout trying to enqueue batch of MongoDB documents. Waited " + MONGO_QUEUE_OFFER_TIMEOUT);
        }
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.totalEnqueueWaitTimeMillis += elapsed;
        if (elapsed > 1) {
            logWithRateLimit(() -> {
                LOG.info("Enqueuing of Mongo document batch was delayed, took {} ms. mongoDocQueue size {}. Consider increasing the number of Transform threads. (This message is logged at most once every {} seconds)", Long.valueOf(elapsed), Integer.valueOf(this.mongoDocQueue.size()), 10);
            });
        }
    }

    private void logWithRateLimit(Runnable runnable) {
        Instant now = Instant.now();
        if (Duration.between(this.lastDelayedEnqueueWarningMessageLoggedTimestamp, now).toSeconds() > 10) {
            runnable.run();
            this.lastDelayedEnqueueWarningMessageLoggedTimestamp = now;
        }
    }
}
