package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;

import com.mongodb.BasicDBObject;
import com.mongodb.MongoException;
import com.mongodb.MongoIncompatibleDriverException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.ReadPreference;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Sorts;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.guava.common.base.Stopwatch;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.class */
public class PipelinedMongoDownloadTask implements Callable<Result> {
    public static final String OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS = "oak.indexer.pipelined.retryOnConnectionErrors";
    public static final boolean DEFAULT_OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS = true;
    public static final String OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = "oak.indexer.pipelined.mongoConnectionRetrySeconds";
    public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS = 300;
    private static final long retryInitialIntervalMillis = 100;
    private static final long retryMaxIntervalMillis = 10000;
    private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES = 10;
    private final int batchSize;
    private final BlockingQueue<BasicDBObject[]> mongoDocQueue;
    private final boolean retryOnConnectionErrors;
    private final MongoCollection<BasicDBObject> dbCollection;
    private final ReadPreference readPreference;
    private static final Logger LOG = LoggerFactory.getLogger(PipelinedMongoDownloadTask.class);
    private static final Duration MONGO_QUEUE_OFFER_TIMEOUT = Duration.ofMinutes(30);
    private final Logger traversalLog = LoggerFactory.getLogger(PipelinedMongoDownloadTask.class.getName() + ".traversal");
    private final Stopwatch downloadStartWatch = Stopwatch.createUnstarted();
    private long totalEnqueueWaitTimeMillis = 0;
    private Instant lastDelayedEnqueueWarningMessageLoggedTimestamp = Instant.now();
    private long documentsRead = 0;
    private long nextLastModified = 0;
    private String lastIdDownloaded = null;
    private final int retryDuringSeconds = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS, DEFAULT_OAK_INDEXER_PIPELINED_MONGO_CONNECTION_RETRY_SECONDS);

    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask$Result.class */
    public static class Result {
        private final long documentsDownloaded;

        public Result(long j) {
            this.documentsDownloaded = j;
        }

        public long getDocumentsDownloaded() {
            return this.documentsDownloaded;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask$RetryException.class */
    public static class RetryException extends RuntimeException {
        private final int retrialDurationSeconds;

        public RetryException(int i, String str, Throwable th) {
            super(str, th);
            this.retrialDurationSeconds = i;
        }

        @Override // java.lang.Throwable
        public String toString() {
            return "Tried for " + this.retrialDurationSeconds + " seconds: \n" + super.toString();
        }
    }

    public PipelinedMongoDownloadTask(MongoCollection<BasicDBObject> mongoCollection, int i, BlockingQueue<BasicDBObject[]> blockingQueue) {
        this.dbCollection = mongoCollection;
        this.batchSize = i;
        this.mongoDocQueue = blockingQueue;
        Preconditions.checkArgument(this.retryDuringSeconds > 0, "Property oak.indexer.pipelined.mongoConnectionRetrySeconds must be > 0. Was: " + this.retryDuringSeconds);
        this.retryOnConnectionErrors = ConfigHelper.getSystemPropertyAsBoolean(OAK_INDEXER_PIPELINED_RETRY_ON_CONNECTION_ERRORS, true);
        this.readPreference = ReadPreference.secondaryPreferred();
        LOG.info("Using read preference {}", this.readPreference.getName());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    /* JADX WARN: Finally extract failed */
    @Override // java.util.concurrent.Callable
    public Result call() throws Exception {
        String name = Thread.currentThread().getName();
        Thread.currentThread().setName("mongo-dump");
        try {
            try {
                LOG.info("Starting to download from MongoDB.");
                this.nextLastModified = 0L;
                this.lastIdDownloaded = null;
                this.downloadStartWatch.start();
                if (this.retryOnConnectionErrors) {
                    Instant instant = null;
                    long j = 100;
                    int i = 0;
                    boolean z = false;
                    HashMap hashMap = new HashMap();
                    while (!z) {
                        try {
                            if (this.lastIdDownloaded != null) {
                                LOG.info("Recovering from broken connection, finishing downloading documents with _modified={}", Long.valueOf(this.nextLastModified));
                                downloadRange(new DownloadRange(this.nextLastModified, this.nextLastModified + 1, this.lastIdDownloaded));
                                instant = null;
                                i = 0;
                                downloadRange(new DownloadRange(this.nextLastModified + 1, Long.MAX_VALUE, null));
                            } else {
                                downloadRange(new DownloadRange(this.nextLastModified, Long.MAX_VALUE, null));
                            }
                            z = true;
                        } catch (MongoException e) {
                            if ((e instanceof MongoInterruptedException) || (e instanceof MongoIncompatibleDriverException)) {
                                throw e;
                            }
                            if (instant == null) {
                                instant = Instant.now().truncatedTo(ChronoUnit.SECONDS);
                            }
                            LOG.warn("Connection error downloading from MongoDB.", e);
                            long seconds = Duration.between(instant, Instant.now()).toSeconds();
                            if (seconds > this.retryDuringSeconds) {
                                StringBuilder sb = new StringBuilder();
                                for (Map.Entry entry : hashMap.entrySet()) {
                                    sb.append("\n\t").append(entry.getValue()).append("x: ").append((String) entry.getKey());
                                }
                                throw new RetryException(this.retryDuringSeconds, sb.toString(), e);
                            }
                            i++;
                            LOG.warn("Retrying download in {} ms; number of times failed: {}; current series of failures started at: {} ({} seconds ago)", new Object[]{Long.valueOf(j), Integer.valueOf(i), instant, Long.valueOf(seconds)});
                            hashMap.compute(e.getClass().getSimpleName() + " - " + e.getMessage(), (str, num) -> {
                                return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
                            });
                            try {
                                Thread.sleep(j);
                            } catch (InterruptedException e2) {
                            }
                            j = Math.min(retryMaxIntervalMillis, j * 2);
                        }
                    }
                } else {
                    downloadAll();
                }
                LOG.info("Terminating task. Downloaded {} Mongo documents in {}. Total enqueuing delay: {} ms ({}%)", new Object[]{Long.valueOf(this.documentsRead), this.downloadStartWatch, Long.valueOf(this.totalEnqueueWaitTimeMillis), String.format("%1.2f", Double.valueOf((100.0d * this.totalEnqueueWaitTimeMillis) / this.downloadStartWatch.elapsed(TimeUnit.MILLISECONDS)))});
                Result result = new Result(this.documentsRead);
                Thread.currentThread().setName(name);
                return result;
            } catch (Throwable th) {
                Thread.currentThread().setName(name);
                throw th;
            }
        } catch (InterruptedException e3) {
            LOG.warn("Thread interrupted", e3);
            throw e3;
        } catch (Throwable th2) {
            LOG.warn("Thread terminating with exception.", th2);
            throw th2;
        }
    }

    private void reportProgress(String str) {
        if (this.documentsRead % retryMaxIntervalMillis == 0) {
            double elapsed = this.documentsRead / this.downloadStartWatch.elapsed(TimeUnit.SECONDS);
            LOG.info("Dumping from NSET Traversed #{} {} [{}] (Elapsed {})", new Object[]{Long.valueOf(this.documentsRead), str, String.format("%1.2f nodes/s, %1.2f nodes/hr", Double.valueOf(elapsed), Double.valueOf(elapsed * 3600.0d)), this.downloadStartWatch});
        }
        this.traversalLog.trace(str);
    }

    private void downloadRange(DownloadRange downloadRange) throws InterruptedException, TimeoutException {
        BsonDocument findQuery = downloadRange.getFindQuery();
        LOG.info("Traversing: {}. Query: {}", downloadRange, findQuery);
        download(this.dbCollection.withReadPreference(this.readPreference).find(findQuery).sort(Sorts.ascending(new String[]{"_modified", "_id"})));
    }

    private void downloadAll() throws InterruptedException, TimeoutException {
        LOG.info("Traversing all documents");
        download(this.dbCollection.withReadPreference(this.readPreference).find());
    }

    private void download(FindIterable<BasicDBObject> findIterable) throws InterruptedException, TimeoutException {
        MongoCursor it = findIterable.iterator();
        try {
            BasicDBObject[] basicDBObjectArr = new BasicDBObject[this.batchSize];
            int i = 0;
            while (it.hasNext()) {
                try {
                    BasicDBObject basicDBObject = (BasicDBObject) it.next();
                    String string = basicDBObject.getString("_id");
                    if (this.retryOnConnectionErrors) {
                        this.nextLastModified = basicDBObject.getLong("_modified");
                    }
                    this.lastIdDownloaded = string;
                    this.documentsRead++;
                    reportProgress(string);
                    basicDBObjectArr[i] = basicDBObject;
                    i++;
                    if (i == this.batchSize) {
                        tryEnqueue(basicDBObjectArr);
                        basicDBObjectArr = new BasicDBObject[this.batchSize];
                        i = 0;
                    }
                } catch (MongoException e) {
                    if ((e instanceof MongoInterruptedException) || (e instanceof MongoIncompatibleDriverException)) {
                        throw e;
                    }
                    if (i > 0) {
                        LOG.info("Connection interrupted with recoverable failure. Enqueueing partial block of size: {}", Integer.valueOf(i));
                        enqueuePartialBlock(basicDBObjectArr, i);
                    }
                    throw e;
                }
            }
            if (i > 0) {
                LOG.info("Enqueueing last block of size: {}", Integer.valueOf(i));
                enqueuePartialBlock(basicDBObjectArr, i);
            }
            if (it != null) {
                it.close();
            }
        } catch (Throwable th) {
            if (it != null) {
                try {
                    it.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void enqueuePartialBlock(BasicDBObject[] basicDBObjectArr, int i) throws InterruptedException, TimeoutException {
        if (basicDBObjectArr.length == i) {
            tryEnqueue(basicDBObjectArr);
            return;
        }
        BasicDBObject[] basicDBObjectArr2 = new BasicDBObject[i];
        System.arraycopy(basicDBObjectArr, 0, basicDBObjectArr2, 0, i);
        tryEnqueue(basicDBObjectArr2);
    }

    private void tryEnqueue(BasicDBObject[] basicDBObjectArr) throws TimeoutException, InterruptedException {
        Stopwatch createStarted = Stopwatch.createStarted();
        if (!this.mongoDocQueue.offer(basicDBObjectArr, MONGO_QUEUE_OFFER_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException("Timeout trying to enqueue batch of MongoDB documents. Waited " + MONGO_QUEUE_OFFER_TIMEOUT);
        }
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.totalEnqueueWaitTimeMillis += elapsed;
        if (elapsed > 1) {
            logWithRateLimit(() -> {
                LOG.info("Enqueuing of Mongo document batch was delayed, took {} ms. mongoDocQueue size {}. Consider increasing the number of Transform threads. (This message is logged at most once every {} seconds)", new Object[]{Long.valueOf(elapsed), Integer.valueOf(this.mongoDocQueue.size()), Integer.valueOf(MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES)});
            });
        }
    }

    private void logWithRateLimit(Runnable runnable) {
        Instant now = Instant.now();
        if (Duration.between(this.lastDelayedEnqueueWarningMessageLoggedTimestamp, now).toSeconds() > 10) {
            runnable.run();
            this.lastDelayedEnqueueWarningMessageLoggedTimestamp = now;
        }
    }
}
