package org.apache.james.backends.es.v7.search;

import com.github.fge.lambdas.Throwing;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.james.backends.es.v7.ReactorElasticSearchClient;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/james/backends/es/v7/search/ScrolledSearch.class */
public class ScrolledSearch {
    private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
    private final ReactorElasticSearchClient client;
    private final SearchRequest searchRequest;

    public ScrolledSearch(ReactorElasticSearchClient reactorElasticSearchClient, SearchRequest searchRequest) {
        this.client = reactorElasticSearchClient;
        this.searchRequest = searchRequest;
    }

    public Flux<SearchHit> searchHits() {
        return searchResponses().concatMap(searchResponse -> {
            return Flux.just(searchResponse.getHits().getHits());
        });
    }

    public Flux<SearchResponse> searchResponses() {
        return Flux.push(fluxSink -> {
            AtomicReference atomicReference = new AtomicReference(Optional.empty());
            fluxSink.onRequest(j -> {
                next(fluxSink, atomicReference, j);
            });
            fluxSink.onDispose(() -> {
                close(atomicReference);
            });
        });
    }

    private void next(FluxSink<SearchResponse> fluxSink, AtomicReference<Optional<String>> atomicReference, long j) {
        if (j <= 0) {
            return;
        }
        Consumer consumer = searchResponse -> {
            atomicReference.set(Optional.of(searchResponse.getScrollId()));
            fluxSink.next(searchResponse);
            if (searchResponse.getHits().getHits().length == 0) {
                fluxSink.complete();
            } else {
                next(fluxSink, atomicReference, j - 1);
            }
        };
        Objects.requireNonNull(fluxSink);
        buildRequest(atomicReference.get()).subscribe(consumer, fluxSink::error);
    }

    private Mono<SearchResponse> buildRequest(Optional<String> optional) {
        return (Mono) optional.map(str -> {
            return this.client.scroll(new SearchScrollRequest().scrollId((String) optional.get()).scroll(TIMEOUT), RequestOptions.DEFAULT);
        }).orElseGet(() -> {
            return this.client.search(this.searchRequest, RequestOptions.DEFAULT);
        });
    }

    public void close(AtomicReference<Optional<String>> atomicReference) {
        atomicReference.get().map(str -> {
            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
            clearScrollRequest.addScrollId(str);
            return clearScrollRequest;
        }).ifPresent(Throwing.consumer(clearScrollRequest -> {
            this.client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe();
        }).sneakyThrow());
    }
}
