package org.apache.jackrabbit.oak.plugins.index.elastic.query.async;

import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Highlight;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
import co.elastic.clients.json.JsonpUtils;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreService;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexNode;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticMetricHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticQueryIterator;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticRequestHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticResponseHandler;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener;
import org.apache.jackrabbit.oak.plugins.index.elastic.query.async.facets.ElasticFacetProvider;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
import org.apache.jackrabbit.oak.spi.query.QueryIndex;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.class */
public class ElasticResultRowAsyncIterator implements ElasticQueryIterator, ElasticResponseListener.SearchHitListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ElasticResultRowAsyncIterator.class);
    private static final FulltextIndex.FulltextResultRow POISON_PILL = new FulltextIndex.FulltextResultRow("___OAK_POISON_PILL___", DocumentNodeStoreService.DEFAULT_RGC_DELAY_FACTOR, Collections.emptyMap(), null, null);
    private final ElasticIndexNode indexNode;
    private final QueryIndex.IndexPlan indexPlan;
    private final Predicate<String> rowInclusionPredicate;
    private final ElasticMetricHandler metricHandler;
    private final ElasticRequestHandler elasticRequestHandler;
    private final ElasticResponseHandler elasticResponseHandler;
    private final ElasticFacetProvider elasticFacetProvider;
    private FulltextIndex.FulltextResultRow nextRow;
    private final BlockingQueue<FulltextIndex.FulltextResultRow> queue = new LinkedBlockingQueue();
    private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    private final ElasticQueryScanner elasticQueryScanner = initScanner();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator$ElasticQueryScanner.class */
    public class ElasticQueryScanner {
        private static final int SMALL_RESULT_SET_SIZE = 10;
        private final Query query;
        private final SearchRequest searchRequest;

        @NotNull
        private final List<SortOptions> sorts;
        private final Highlight highlight;
        private final SourceConfig sourceConfig;
        private int scannedRows;
        private int requests;
        private boolean fullScan;
        private long searchStartTime;
        private List<FieldValue> lastHitSortValues;
        private final Set<ElasticResponseListener> allListeners = new HashSet();
        private final List<ElasticResponseListener.SearchHitListener> searchHitListeners = new ArrayList();
        private final List<ElasticResponseListener.AggregationListener> aggregationListeners = new ArrayList();
        private final AtomicBoolean anyDataLeft = new AtomicBoolean(false);
        private final Semaphore semaphore = new Semaphore(1);

        ElasticQueryScanner(List<ElasticResponseListener> list) {
            this.query = ElasticResultRowAsyncIterator.this.elasticRequestHandler.baseQuery();
            this.sorts = ElasticResultRowAsyncIterator.this.elasticRequestHandler.baseSorts();
            this.highlight = ElasticResultRowAsyncIterator.this.elasticRequestHandler.highlight();
            HashSet hashSet = new HashSet();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            list.forEach(elasticResponseListener -> {
                this.allListeners.add(elasticResponseListener);
                hashSet.addAll(elasticResponseListener.sourceFields());
                if (elasticResponseListener instanceof ElasticResponseListener.SearchHitListener) {
                    ElasticResponseListener.SearchHitListener searchHitListener = (ElasticResponseListener.SearchHitListener) elasticResponseListener;
                    this.searchHitListeners.add(searchHitListener);
                    if (searchHitListener.isFullScan()) {
                        this.fullScan = true;
                    }
                }
                if (elasticResponseListener instanceof ElasticResponseListener.AggregationListener) {
                    this.aggregationListeners.add((ElasticResponseListener.AggregationListener) elasticResponseListener);
                    atomicBoolean.set(true);
                }
            });
            this.sourceConfig = SourceConfig.of(builder -> {
                return builder.filter(builder -> {
                    return builder.includes(new ArrayList(hashSet));
                });
            });
            this.searchRequest = SearchRequest.of(builder2 -> {
                builder2.index(ElasticResultRowAsyncIterator.this.indexNode.getDefinition().getIndexAlias(), new String[0]).trackTotalHits(builder2 -> {
                    return builder2.count(ElasticResultRowAsyncIterator.this.indexNode.getDefinition().trackTotalHits);
                }).sort(this.sorts).source(this.sourceConfig).query(this.query).highlight(this.highlight).size(Integer.valueOf(atomicBoolean.get() ? Math.min(10, getFetchSize(this.requests)) : getFetchSize(this.requests)));
                if (atomicBoolean.get()) {
                    builder2.aggregations(ElasticResultRowAsyncIterator.this.elasticRequestHandler.aggregations());
                }
                return builder2;
            });
            ElasticResultRowAsyncIterator.LOG.trace("Kicking initial search for query {}", this.searchRequest);
            this.semaphore.tryAcquire();
            this.searchStartTime = System.currentTimeMillis();
            this.requests++;
            ElasticResultRowAsyncIterator.this.indexNode.getConnection().getAsyncClient().search(this.searchRequest, ObjectNode.class).whenComplete((searchResponse, th) -> {
                if (th != null) {
                    onFailure(th);
                } else {
                    onSuccess(searchResponse);
                }
            });
            ElasticResultRowAsyncIterator.this.metricHandler.markQuery(ElasticResultRowAsyncIterator.this.indexNode.getDefinition().getIndexPath(), true);
        }

        public void onSuccess(SearchResponse<ObjectNode> searchResponse) {
            long currentTimeMillis = System.currentTimeMillis() - this.searchStartTime;
            List<Hit<ObjectNode>> hits = searchResponse.hits().hits();
            int size = hits != null ? hits.size() : 0;
            ElasticResultRowAsyncIterator.this.metricHandler.measureQuery(ElasticResultRowAsyncIterator.this.indexNode.getDefinition().getIndexPath(), size, searchResponse.took(), currentTimeMillis, searchResponse.timedOut());
            if (size <= 0) {
                ElasticResultRowAsyncIterator.LOG.trace("No results: closing scanner, notifying listeners");
                close();
                return;
            }
            long value = searchResponse.hits().total().value();
            ElasticResultRowAsyncIterator.LOG.debug("Processing search response that took {} to read {}/{} docs", Long.valueOf(searchResponse.took()), Integer.valueOf(size), Long.valueOf(value));
            this.lastHitSortValues = hits.get(size - 1).sort();
            this.scannedRows += size;
            if (searchResponse.hits().total().relation() == TotalHitsRelation.Eq) {
                this.anyDataLeft.set(value > ((long) this.scannedRows));
            } else {
                this.anyDataLeft.set(true);
            }
            this.semaphore.release();
            if (this.requests == 1) {
                Iterator<ElasticResponseListener.SearchHitListener> it = this.searchHitListeners.iterator();
                while (it.hasNext()) {
                    it.next().startData(value);
                }
                if (!this.aggregationListeners.isEmpty()) {
                    ElasticResultRowAsyncIterator.LOG.trace("Emitting aggregations {}", searchResponse.aggregations());
                    Iterator<ElasticResponseListener.AggregationListener> it2 = this.aggregationListeners.iterator();
                    while (it2.hasNext()) {
                        it2.next().on(searchResponse.aggregations());
                    }
                }
            }
            ElasticResultRowAsyncIterator.LOG.trace("Emitting {} search hits, for a total of {} scanned results", Integer.valueOf(hits.size()), Integer.valueOf(this.scannedRows));
            for (Hit<ObjectNode> hit : hits) {
                Iterator<ElasticResponseListener.SearchHitListener> it3 = this.searchHitListeners.iterator();
                while (it3.hasNext()) {
                    it3.next().on(hit);
                }
            }
            if (!this.anyDataLeft.get()) {
                ElasticResultRowAsyncIterator.LOG.trace("No data left: closing scanner, notifying listeners");
                close();
            } else if (this.fullScan) {
                scan();
            }
        }

        public void onFailure(Throwable th) {
            ElasticResultRowAsyncIterator.this.metricHandler.measureFailedQuery(ElasticResultRowAsyncIterator.this.indexNode.getDefinition().getIndexPath(), System.currentTimeMillis() - this.searchStartTime);
            Throwable andSet = ElasticResultRowAsyncIterator.this.errorRef.getAndSet(th);
            if (andSet != null) {
                ElasticResultRowAsyncIterator.LOG.warn("Error reference for async iterator was previously set to {}. It has now been reset to new error {}", andSet.getMessage(), th.getMessage());
            }
            ElasticResultRowAsyncIterator.LOG.error("Error retrieving data for jcr query [{}] :: Corresponding ES query {} : closing scanner, notifying listeners", ElasticResultRowAsyncIterator.this.indexPlan.getFilter(), this.query, th);
            close();
        }

        private void scan() {
            if (!this.semaphore.tryAcquire() || !this.anyDataLeft.get()) {
                ElasticResultRowAsyncIterator.LOG.trace("Scanner is closing or still processing data from the previous scan");
                return;
            }
            SearchRequest of = SearchRequest.of(builder -> {
                SearchRequest.Builder highlight = builder.index(ElasticResultRowAsyncIterator.this.indexNode.getDefinition().getIndexAlias(), new String[0]).trackTotalHits(builder -> {
                    return builder.count(ElasticResultRowAsyncIterator.this.indexNode.getDefinition().trackTotalHits);
                }).sort(this.sorts).source(this.sourceConfig).searchAfter(this.lastHitSortValues).query(this.query).highlight(this.highlight);
                int i = this.requests;
                this.requests = i + 1;
                return highlight.size(Integer.valueOf(getFetchSize(i)));
            });
            ElasticResultRowAsyncIterator.LOG.trace("Kicking new search after query {}", of);
            this.searchStartTime = System.currentTimeMillis();
            ElasticResultRowAsyncIterator.this.indexNode.getConnection().getAsyncClient().search(of, ObjectNode.class).whenComplete((searchResponse, th) -> {
                if (th != null) {
                    onFailure(th);
                } else {
                    onSuccess(searchResponse);
                }
            });
            ElasticResultRowAsyncIterator.this.metricHandler.markQuery(ElasticResultRowAsyncIterator.this.indexNode.getDefinition().getIndexPath(), false);
        }

        private int getFetchSize(int i) {
            int[] iArr = ElasticResultRowAsyncIterator.this.indexNode.getDefinition().queryFetchSizes;
            return iArr.length > i ? iArr[i] : iArr[iArr.length - 1];
        }

        private void close() {
            this.semaphore.release();
            Iterator<ElasticResponseListener> it = this.allListeners.iterator();
            while (it.hasNext()) {
                it.next().endData();
            }
        }
    }

    public ElasticResultRowAsyncIterator(@NotNull ElasticIndexNode elasticIndexNode, @NotNull ElasticRequestHandler elasticRequestHandler, @NotNull ElasticResponseHandler elasticResponseHandler, @NotNull QueryIndex.IndexPlan indexPlan, Predicate<String> predicate, ElasticMetricHandler elasticMetricHandler) {
        this.indexNode = elasticIndexNode;
        this.elasticRequestHandler = elasticRequestHandler;
        this.elasticResponseHandler = elasticResponseHandler;
        this.indexPlan = indexPlan;
        this.rowInclusionPredicate = predicate;
        this.metricHandler = elasticMetricHandler;
        this.elasticFacetProvider = elasticRequestHandler.getAsyncFacetProvider(elasticIndexNode.getConnection(), elasticResponseHandler);
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        if (this.nextRow == null) {
            if (this.queue.isEmpty()) {
                this.elasticQueryScanner.scan();
            }
            try {
                this.nextRow = this.queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Error reading next result from Elastic", e);
            }
        }
        Throwable andSet = this.errorRef.getAndSet(null);
        if (andSet != null) {
            andSet.fillInStackTrace();
            LOG.error("Error while fetching results from Elastic for [{}]", this.indexPlan.getFilter(), andSet);
        }
        return !POISON_PILL.path.equals(this.nextRow.path);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public FulltextIndex.FulltextResultRow next() {
        if (this.nextRow == null && !hasNext()) {
            return null;
        }
        FulltextIndex.FulltextResultRow fulltextResultRow = null;
        if (this.nextRow != null && !POISON_PILL.path.equals(this.nextRow.path)) {
            fulltextResultRow = this.nextRow;
            this.nextRow = null;
        }
        return fulltextResultRow;
    }

    @Override // org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener.SearchHitListener
    public void on(Hit<ObjectNode> hit) {
        String path = this.elasticResponseHandler.getPath(hit);
        if (path != null) {
            if (this.rowInclusionPredicate != null && !this.rowInclusionPredicate.test(path)) {
                LOG.trace("Path {} not included because of hierarchy inclusion rules", path);
                return;
            }
            LOG.trace("Path {} satisfies hierarchy inclusion rules", path);
            try {
                this.queue.put(new FulltextIndex.FulltextResultRow(path, hit.score() != null ? hit.score().doubleValue() : DocumentNodeStoreService.DEFAULT_RGC_DELAY_FACTOR, this.elasticResponseHandler.excerpts(hit), this.elasticFacetProvider, null));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Error producing results into the iterator queue", e);
            }
        }
    }

    @Override // org.apache.jackrabbit.oak.plugins.index.elastic.query.async.ElasticResponseListener
    public void endData() {
        try {
            this.queue.put(POISON_PILL);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Error inserting poison pill into the iterator queue", e);
        }
    }

    private ElasticQueryScanner initScanner() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this);
        if (this.elasticFacetProvider != null && (this.elasticFacetProvider instanceof ElasticResponseListener)) {
            arrayList.add((ElasticResponseListener) this.elasticFacetProvider);
        }
        return new ElasticQueryScanner(arrayList);
    }

    @Override // org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticQueryIterator
    public String explain() {
        return JsonpUtils.toString(this.elasticQueryScanner.searchRequest, new StringBuilder()).toString();
    }

    @Override // org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticQueryIterator, java.lang.AutoCloseable
    public void close() {
        this.elasticQueryScanner.close();
    }
}
