package org.apache.pekko.stream.connectors.elasticsearch.impl;

import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.scaladsl.HttpExt;
import org.apache.pekko.http.scaladsl.model.HttpMethods$;
import org.apache.pekko.http.scaladsl.model.HttpRequest$;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.http.scaladsl.model.HttpResponse$;
import org.apache.pekko.http.scaladsl.model.OptHttpResponse$;
import org.apache.pekko.http.scaladsl.model.ResponseEntity;
import org.apache.pekko.http.scaladsl.model.StatusCode;
import org.apache.pekko.http.scaladsl.model.StatusCodes;
import org.apache.pekko.http.scaladsl.model.StatusCodes$;
import org.apache.pekko.http.scaladsl.model.Uri;
import org.apache.pekko.http.scaladsl.model.Uri$;
import org.apache.pekko.http.scaladsl.model.Uri$Path$;
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal$;
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshaller$;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchParams;
import org.apache.pekko.stream.connectors.elasticsearch.ReadResult;
import org.apache.pekko.stream.connectors.elasticsearch.SourceSettingsBase;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Some$;
import scala.collection.immutable.Map;
import scala.concurrent.ExecutionContext;
import scala.util.Failure$;
import scala.util.Success$;
import scala.util.Try;

/* compiled from: ElasticsearchSourceStage.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSourceLogic.class */
public final class ElasticsearchSourceLogic<T> extends GraphStageLogic implements OutHandler, StageLogging {
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final ElasticsearchParams elasticsearchParams;
    private final Map<String, String> searchParams;
    private final SourceSettingsBase<?, ?> settings;
    private final Outlet<ReadResult<T>> out;
    private final MessageReader<T> reader;
    private final HttpExt http;
    private final Materializer mat;
    private final ExecutionContext ec;
    private Option<String> scrollId;
    private final AsyncCallback<String> responseHandler;
    public final AsyncCallback<Throwable> org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSourceLogic$$failureHandler;
    private boolean waitingForElasticData;
    private boolean pullIsWaitingForData;
    private Option<ScrollResponse<T>> dataReady;
    private final AsyncCallback<Try<String>> clearScrollAsyncHandler;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ElasticsearchSourceLogic(ElasticsearchParams elasticsearchParams, Map<String, String> map, SourceSettingsBase<?, ?> sourceSettingsBase, Outlet<ReadResult<T>> outlet, SourceShape<ReadResult<T>> sourceShape, MessageReader<T> messageReader, HttpExt httpExt, Materializer materializer, ExecutionContext executionContext) {
        super(sourceShape);
        this.elasticsearchParams = elasticsearchParams;
        this.searchParams = map;
        this.settings = sourceSettingsBase;
        this.out = outlet;
        this.reader = messageReader;
        this.http = httpExt;
        this.mat = materializer;
        this.ec = executionContext;
        StageLogging.$init$(this);
        this.scrollId = None$.MODULE$;
        this.responseHandler = getAsyncCallback(str -> {
            handleResponse(str);
        });
        this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSourceLogic$$failureHandler = getAsyncCallback(th -> {
            handleFailure(th);
        });
        this.waitingForElasticData = false;
        this.pullIsWaitingForData = false;
        this.dataReady = None$.MODULE$;
        setHandler(outlet, this);
        this.clearScrollAsyncHandler = getAsyncCallback(r5 -> {
            log().debug("Result of clearing the scroll: {}", r5);
            completeStage();
        });
    }

    public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public Uri prepareUri(Uri.Path path) {
        return Uri$.MODULE$.apply(this.settings.connection().baseUrl()).withPath(path);
    }

    /* JADX WARN: Code restructure failed: missing block: B:38:0x025b, code lost:
    
        if (r0.equals(r0) != false) goto L41;
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x0219, code lost:
    
        if (r0.equals(r0) != false) goto L33;
     */
    /* JADX WARN: Code restructure failed: missing block: B:44:0x01c4, code lost:
    
        if (r0.equals(r0) != false) goto L25;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void sendScrollScanRequest() {
        /*
            Method dump skipped, instructions count: 1166
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pekko.stream.connectors.elasticsearch.impl.ElasticsearchSourceLogic.sendScrollScanRequest():void");
    }

    public void handleFailure(Throwable th) {
        this.waitingForElasticData = false;
        failStage(th);
    }

    public void handleResponse(String str) {
        this.waitingForElasticData = false;
        ScrollResponse<T> convert = this.reader.convert(str);
        if (!this.pullIsWaitingForData) {
            log().debug("Received data from elastic. Downstream have not yet asked for it");
            this.dataReady = Some$.MODULE$.apply(convert);
            return;
        }
        log().debug("Received data from elastic. Downstream has already called pull and is waiting for data");
        this.pullIsWaitingForData = false;
        if (handleScrollResponse(convert)) {
            sendScrollScanRequest();
        }
    }

    public boolean handleScrollResponse(ScrollResponse<T> scrollResponse) {
        if (scrollResponse != null) {
            ScrollResponse<T> unapply = ScrollResponse$.MODULE$.unapply(scrollResponse);
            Some _1 = unapply._1();
            Some _2 = unapply._2();
            if (_1 instanceof Some) {
                failStage(new IllegalStateException((String) _1.value()));
                return false;
            }
            if (None$.MODULE$.equals(_1) && (_2 instanceof Some) && ((ScrollResult) _2.value()).messages().isEmpty()) {
                clearScrollAsync();
                return false;
            }
            if (_2 instanceof Some) {
                ScrollResult scrollResult = (ScrollResult) _2.value();
                this.scrollId = scrollResult.scrollId();
                log().debug("Pushing data downstream");
                emitMultiple(this.out, scrollResult.messages().iterator());
                return true;
            }
        }
        if (!(scrollResponse instanceof ScrollResponse)) {
            throw new MatchError(scrollResponse);
        }
        failStage(new IllegalArgumentException(new StringBuilder(21).append("unexpected response: ").append(scrollResponse).toString()));
        return false;
    }

    public void onPull() {
        Some some = this.dataReady;
        if (some instanceof Some) {
            ScrollResponse<T> scrollResponse = (ScrollResponse) some.value();
            log().debug("Downstream is pulling data and we already have data ready");
            if (handleScrollResponse(scrollResponse)) {
                this.dataReady = None$.MODULE$;
                if (this.waitingForElasticData) {
                    return;
                }
                sendScrollScanRequest();
                return;
            }
            return;
        }
        if (!None$.MODULE$.equals(some)) {
            throw new MatchError(some);
        }
        if (this.pullIsWaitingForData) {
            throw new Exception("This should not happen: Downstream is pulling more than once");
        }
        this.pullIsWaitingForData = true;
        if (this.waitingForElasticData) {
            log().debug("Downstream is pulling data. Already waiting for data");
        } else {
            log().debug("Downstream is pulling data. We must go and get it");
            sendScrollScanRequest();
        }
    }

    public void onDownstreamFinish(Throwable th) {
        clearScrollAsync();
        setKeepGoing(true);
    }

    public void clearScrollAsync() {
        Some some = this.scrollId;
        if (None$.MODULE$.equals(some)) {
            log().debug("Scroll Id is empty. Completing stage eagerly.");
            completeStage();
        } else {
            if (!(some instanceof Some)) {
                throw new MatchError(some);
            }
            Uri prepareUri = prepareUri(Uri$Path$.MODULE$.apply(new StringBuilder(16).append("/_search/scroll/").append((String) some.value()).toString(), Uri$Path$.MODULE$.apply$default$2()));
            ElasticsearchApi$.MODULE$.executeRequest(HttpRequest$.MODULE$.apply(HttpMethods$.MODULE$.DELETE(), HttpRequest$.MODULE$.apply$default$2(), HttpRequest$.MODULE$.apply$default$3(), HttpRequest$.MODULE$.apply$default$4(), HttpRequest$.MODULE$.apply$default$5()).withUri(prepareUri).withHeaders(this.settings.connection().headers()), this.settings.connection(), this.http).flatMap(httpResponse -> {
                if (httpResponse != null) {
                    HttpResponse unapply = HttpResponse$.MODULE$.unapply(httpResponse);
                    if (!OptHttpResponse$.MODULE$.isEmpty$extension(unapply)) {
                        StatusCode _1 = unapply._1();
                        unapply._2();
                        ResponseEntity _3 = unapply._3();
                        unapply._4();
                        StatusCodes.Success OK = StatusCodes$.MODULE$.OK();
                        if (OK != null ? OK.equals(_1) : _1 == null) {
                            return Unmarshal$.MODULE$.apply(_3).to(Unmarshaller$.MODULE$.stringUnmarshaller(), this.ec, this.mat).map(str -> {
                                this.clearScrollAsyncHandler.invoke(Success$.MODULE$.apply(str));
                            }, this.ec);
                        }
                    }
                }
                if (httpResponse != null) {
                    return Unmarshal$.MODULE$.apply(httpResponse.entity()).to(Unmarshaller$.MODULE$.stringUnmarshaller(), this.ec, this.mat).map(str2 -> {
                        this.clearScrollAsyncHandler.invoke(Failure$.MODULE$.apply(new RuntimeException(new StringBuilder(42).append("Request failed for POST ").append(prepareUri).append(", got ").append(httpResponse.status()).append(" with body: ").append(str2).toString())));
                    }, this.ec);
                }
                throw new MatchError(httpResponse);
            }, this.ec).recover(new ElasticsearchSourceLogic$$anon$3(this), this.ec);
        }
    }
}
