/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search;

import com.carrotsearch.hppc.IntArrayList;
import java.io.IOException;
import java.util.List;
import java.util.function.BiFunction;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.search.ScoreDoc;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.ActionListener;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.ActionRunnable;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.OriginalIndices;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.CountedCollector;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.ExpandSearchPhase;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.InitialSearchPhase;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.SearchActionListener;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.SearchPhase;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.SearchPhaseContext;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.SearchPhaseController;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.SearchResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.search.TransportSearchHelper;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.util.concurrent.AtomicArray;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.search.SearchPhaseResult;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.search.SearchShardTarget;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.search.fetch.FetchSearchResult;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.search.internal.InternalSearchResponse;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.search.query.QuerySearchResult;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.transport.Transport;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

final class FetchSearchPhase
extends SearchPhase {
    private final AtomicArray<FetchSearchResult> fetchResults;
    private final SearchPhaseController searchPhaseController;
    private final AtomicArray<SearchPhaseResult> queryResults;
    private final BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory;
    private final SearchPhaseContext context;
    private final Logger logger;
    private final InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer;

    FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer, SearchPhaseController searchPhaseController, SearchPhaseContext context) {
        this(resultConsumer, searchPhaseController, context, (response, scrollId) -> new ExpandSearchPhase(context, (InternalSearchResponse)response, finalResponse -> FetchSearchPhase.sendResponsePhase(finalResponse, scrollId, context)));
    }

    FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer, SearchPhaseController searchPhaseController, SearchPhaseContext context, BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory) {
        super("fetch");
        if (context.getNumShards() != resultConsumer.getNumShards()) {
            throw new IllegalStateException("number of shards must match the length of the query results but doesn't:" + context.getNumShards() + "!=" + resultConsumer.getNumShards());
        }
        this.fetchResults = new AtomicArray(resultConsumer.getNumShards());
        this.searchPhaseController = searchPhaseController;
        this.queryResults = resultConsumer.getAtomicArray();
        this.nextPhaseFactory = nextPhaseFactory;
        this.context = context;
        this.logger = context.getLogger();
        this.resultConsumer = resultConsumer;
    }

    @Override
    public void run() throws IOException {
        this.context.execute(new ActionRunnable<SearchResponse>((ActionListener)this.context){

            @Override
            public void doRun() throws IOException {
                FetchSearchPhase.this.innerRun();
            }

            @Override
            public void onFailure(Exception e) {
                FetchSearchPhase.this.context.onPhaseFailure(FetchSearchPhase.this, "", e);
            }
        });
    }

    private void innerRun() throws IOException {
        int numShards = this.context.getNumShards();
        boolean isScrollSearch = this.context.getRequest().scroll() != null;
        List<SearchPhaseResult> phaseResults = this.queryResults.asList();
        String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(this.queryResults) : null;
        SearchPhaseController.ReducedQueryPhase reducedQueryPhase = this.resultConsumer.reduce();
        boolean queryAndFetchOptimization = this.queryResults.length() == 1;
        Runnable finishPhase = () -> this.moveToNextPhase(this.searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ? this.queryResults : this.fetchResults);
        if (queryAndFetchOptimization) {
            assert (phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null) : "phaseResults empty [" + phaseResults.isEmpty() + "], single result: " + phaseResults.get(0).fetchResult();
            finishPhase.run();
        } else {
            ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
            IntArrayList[] docIdsToLoad = this.searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);
            if (scoreDocs.length == 0) {
                phaseResults.stream().map(SearchPhaseResult::queryResult).forEach(this::releaseIrrelevantSearchContext);
                finishPhase.run();
            } else {
                ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ? this.searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards) : null;
                CountedCollector<FetchSearchResult> counter = new CountedCollector<FetchSearchResult>(r -> this.fetchResults.set(r.getShardIndex(), (FetchSearchResult)r), docIdsToLoad.length, finishPhase, this.context);
                for (int i = 0; i < docIdsToLoad.length; ++i) {
                    IntArrayList entry = docIdsToLoad[i];
                    SearchPhaseResult queryResult = this.queryResults.get(i);
                    if (entry == null) {
                        if (queryResult != null) {
                            this.releaseIrrelevantSearchContext(queryResult.queryResult());
                        }
                        counter.countDown();
                        continue;
                    }
                    SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
                    Transport.Connection connection = this.context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
                    ShardFetchSearchRequest fetchSearchRequest = this.createFetchRequest(queryResult.queryResult().getRequestId(), i, entry, lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
                    this.executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection);
                }
            }
        }
    }

    protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry, ScoreDoc[] lastEmittedDocPerShard, OriginalIndices originalIndices) {
        ScoreDoc lastEmittedDoc = lastEmittedDocPerShard != null ? lastEmittedDocPerShard[index] : null;
        return new ShardFetchSearchRequest(originalIndices, queryId, entry, lastEmittedDoc);
    }

    private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final CountedCollector<FetchSearchResult> counter, final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult, Transport.Connection connection) {
        this.context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, this.context.getTask(), new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex){

            @Override
            public void innerOnResponse(FetchSearchResult result) {
                try {
                    counter.onResult(result);
                }
                catch (Exception e) {
                    FetchSearchPhase.this.context.onPhaseFailure(FetchSearchPhase.this, "", e);
                }
            }

            @Override
            public void onFailure(Exception e) {
                try {
                    FetchSearchPhase.this.logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", (Object)fetchSearchRequest.id()), (Throwable)e);
                    counter.onFailure(shardIndex, shardTarget, e);
                }
                finally {
                    FetchSearchPhase.this.releaseIrrelevantSearchContext(querySearchResult);
                }
            }
        });
    }

    private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
        if (this.context.getRequest().scroll() == null && queryResult.hasSearchContext()) {
            try {
                SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
                Transport.Connection connection = this.context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
                this.context.sendReleaseSearchContext(queryResult.getRequestId(), connection, searchShardTarget.getOriginalIndices());
            }
            catch (Exception e) {
                this.context.getLogger().trace("failed to release context", (Throwable)e);
            }
        }
    }

    private void moveToNextPhase(SearchPhaseController searchPhaseController, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
        InternalSearchResponse internalResponse = searchPhaseController.merge(this.context.getRequest().scroll() != null, reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
        this.context.executeNextPhase(this, this.nextPhaseFactory.apply(internalResponse, scrollId));
    }

    private static SearchPhase sendResponsePhase(final InternalSearchResponse response, final String scrollId, final SearchPhaseContext context) {
        return new SearchPhase("response"){

            @Override
            public void run() throws IOException {
                context.onResponse(context.buildSearchResponse(response, scrollId));
            }
        };
    }
}

