package org.apache.pekko.stream.connectors.elasticsearch.impl;

import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.scaladsl.HttpExt;
import org.apache.pekko.http.scaladsl.model.ContentType$;
import org.apache.pekko.http.scaladsl.model.HttpEntity$;
import org.apache.pekko.http.scaladsl.model.HttpMethods$;
import org.apache.pekko.http.scaladsl.model.HttpRequest$;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.http.scaladsl.model.HttpResponse$;
import org.apache.pekko.http.scaladsl.model.OptHttpResponse$;
import org.apache.pekko.http.scaladsl.model.ResponseEntity;
import org.apache.pekko.http.scaladsl.model.StatusCode;
import org.apache.pekko.http.scaladsl.model.StatusCodes;
import org.apache.pekko.http.scaladsl.model.StatusCodes$;
import org.apache.pekko.http.scaladsl.model.Uri;
import org.apache.pekko.http.scaladsl.model.Uri$;
import org.apache.pekko.http.scaladsl.model.Uri$Path$;
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal$;
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshaller$;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.FlowShape$;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Inlet$;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.Outlet$;
import org.apache.pekko.stream.connectors.elasticsearch.ApiVersion;
import org.apache.pekko.stream.connectors.elasticsearch.ElasticsearchParams;
import org.apache.pekko.stream.connectors.elasticsearch.MessageWriter;
import org.apache.pekko.stream.connectors.elasticsearch.OpensearchApiVersion;
import org.apache.pekko.stream.connectors.elasticsearch.WriteMessage;
import org.apache.pekko.stream.connectors.elasticsearch.WriteResult;
import org.apache.pekko.stream.connectors.elasticsearch.WriteSettingsBase;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStage;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.Tuple3;
import scala.Tuple3$;
import scala.collection.IterableOnceOps;
import scala.collection.StringOps$;
import scala.collection.immutable.Seq;
import scala.concurrent.ExecutionContext;
import scala.runtime.BoxesRunTime;
import spray.json.package$;

/* compiled from: ElasticsearchSimpleFlowStage.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage.class */
public final class ElasticsearchSimpleFlowStage<T, C> extends GraphStage<FlowShape<Tuple2<Seq<WriteMessage<T, C>>, Seq<WriteResult<T, C>>>, Seq<WriteResult<T, C>>>> {
    public final ElasticsearchParams org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$elasticsearchParams;
    public final WriteSettingsBase<?, ?> org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$settings;
    public final HttpExt org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$http;
    public final Materializer org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$mat;
    public final ExecutionContext org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$ec;
    public final Inlet<Tuple2<Seq<WriteMessage<T, C>>, Seq<WriteResult<T, C>>>> org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$in = Inlet$.MODULE$.apply("messagesAndResultPassthrough");
    public final Outlet<Seq<WriteResult<T, C>>> org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$out = Outlet$.MODULE$.apply("result");
    private final FlowShape shape = FlowShape$.MODULE$.apply(this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$in, this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$out);
    public final RestBulkApi<T, C> org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$restApi;
    public final Uri org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$baseUri;

    /* compiled from: ElasticsearchSimpleFlowStage.scala */
    /* loaded from: input_file:org/apache/pekko/stream/connectors/elasticsearch/impl/ElasticsearchSimpleFlowStage$StageLogic.class */
    public class StageLogic extends GraphStageLogic implements InHandler, OutHandler, StageLogging {
        private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
        private boolean inflight;
        public final AsyncCallback<Tuple2<Seq<WriteResult<T, C>>, Throwable>> org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$StageLogic$$failureHandler;
        private final AsyncCallback<Tuple3<Seq<WriteMessage<T, C>>, Seq<WriteResult<T, C>>, String>> responseHandler;
        private final /* synthetic */ ElasticsearchSimpleFlowStage $outer;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public StageLogic(ElasticsearchSimpleFlowStage elasticsearchSimpleFlowStage) {
            super(elasticsearchSimpleFlowStage.m23shape());
            if (elasticsearchSimpleFlowStage == null) {
                throw new NullPointerException();
            }
            this.$outer = elasticsearchSimpleFlowStage;
            StageLogging.$init$(this);
            this.inflight = false;
            this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$StageLogic$$failureHandler = getAsyncCallback(tuple2 -> {
                handleFailure(tuple2);
            });
            this.responseHandler = getAsyncCallback(tuple3 -> {
                handleResponse(tuple3);
            });
            setHandlers(elasticsearchSimpleFlowStage.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$in, elasticsearchSimpleFlowStage.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$out, this);
        }

        public /* bridge */ /* synthetic */ void onUpstreamFailure(Throwable th) throws Exception {
            InHandler.onUpstreamFailure$(this, th);
        }

        public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
            OutHandler.onDownstreamFinish$(this);
        }

        public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
            OutHandler.onDownstreamFinish$(this, th);
        }

        public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
            return this.org$apache$pekko$stream$stage$StageLogging$$_log;
        }

        public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
            this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
        }

        public /* bridge */ /* synthetic */ Class logSource() {
            return StageLogging.logSource$(this);
        }

        public /* bridge */ /* synthetic */ LoggingAdapter log() {
            return StageLogging.log$(this);
        }

        public void onPull() {
            tryPull();
        }

        public void onPush() {
            String sb = this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$settings.allowExplicitIndex() ? "/_bulk" : new StringBuilder(7).append("/").append(this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$elasticsearchParams.indexName()).append("/_bulk").toString();
            Tuple2 tuple2 = (Tuple2) grab(this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$in);
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply((Seq) tuple2._1(), (Seq) tuple2._2());
            Seq<WriteMessage<T, C>> seq = (Seq) apply._1();
            Seq seq2 = (Seq) apply._2();
            this.inflight = true;
            String json = this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$restApi.toJson(seq);
            log().debug("Posting data to Elasticsearch: {}", json);
            if (!StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(json))) {
                handleResponse(Tuple3$.MODULE$.apply(seq, seq2, "{\"took\":0, \"errors\": false, \"items\":[]}"));
            } else {
                Uri withPath = this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$baseUri.withPath(Uri$Path$.MODULE$.apply(sb, Uri$Path$.MODULE$.apply$default$2()));
                ElasticsearchApi$.MODULE$.executeRequest(HttpRequest$.MODULE$.apply(HttpMethods$.MODULE$.POST(), HttpRequest$.MODULE$.apply$default$2(), HttpRequest$.MODULE$.apply$default$3(), HttpRequest$.MODULE$.apply$default$4(), HttpRequest$.MODULE$.apply$default$5()).withUri(withPath).withEntity(HttpEntity$.MODULE$.apply(ContentType$.MODULE$.apply(NDJsonProtocol$.MODULE$.application$divx$minusndjson()), json)).withHeaders(this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$settings.connection().headers()), this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$settings.connection(), this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$http).map(httpResponse -> {
                    if (httpResponse != null) {
                        HttpResponse unapply = HttpResponse$.MODULE$.unapply(httpResponse);
                        if (!OptHttpResponse$.MODULE$.isEmpty$extension(unapply)) {
                            StatusCode _1 = unapply._1();
                            unapply._2();
                            ResponseEntity _3 = unapply._3();
                            unapply._4();
                            StatusCodes.Success OK = StatusCodes$.MODULE$.OK();
                            if (OK != null ? OK.equals(_1) : _1 == null) {
                                return Unmarshal$.MODULE$.apply(_3).to(Unmarshaller$.MODULE$.stringUnmarshaller(), this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$ec, this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$mat).map(str -> {
                                    this.responseHandler.invoke(Tuple3$.MODULE$.apply(seq, seq2, str));
                                }, this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$ec);
                            }
                        }
                    }
                    if (httpResponse != null) {
                        return Unmarshal$.MODULE$.apply(httpResponse.entity()).to(Unmarshaller$.MODULE$.stringUnmarshaller(), this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$ec, this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$mat).map(str2 -> {
                            this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$StageLogic$$failureHandler.invoke(Tuple2$.MODULE$.apply(seq2, new RuntimeException(new StringBuilder(42).append("Request failed for POST ").append(withPath).append(", got ").append(httpResponse.status()).append(" with body: ").append(str2).toString())));
                        }, this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$ec);
                    }
                    throw new MatchError(httpResponse);
                }, this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$ec).recoverWith(new ElasticsearchSimpleFlowStage$$anon$1(withPath, this), this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$ec);
            }
        }

        private void handleFailure(Tuple2<Seq<WriteResult<T, C>>, Throwable> tuple2) {
            this.inflight = false;
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply((Seq) tuple2._1(), (Throwable) tuple2._2());
            Seq seq = (Seq) apply._1();
            Throwable th = (Throwable) apply._2();
            log().error("Received error from elastic after having already processed {} documents. Error: {}", BoxesRunTime.boxToInteger(seq.size()), th);
            failStage(th);
        }

        private void handleResponse(Tuple3<Seq<WriteMessage<T, C>>, Seq<WriteResult<T, C>>, String> tuple3) {
            this.inflight = false;
            if (tuple3 == null) {
                throw new MatchError(tuple3);
            }
            Tuple3 apply = Tuple3$.MODULE$.apply((Seq) tuple3._1(), (Seq) tuple3._2(), (String) tuple3._3());
            Seq<WriteMessage<T, C>> seq = (Seq) apply._1();
            Seq seq2 = (Seq) apply._2();
            String str = (String) apply._3();
            if (log().isDebugEnabled()) {
                log().debug("response {}", package$.MODULE$.enrichString(str).parseJson().prettyPrint());
            }
            Seq<WriteResult<T, C>> writeResults = this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$restApi.toWriteResults(seq, str);
            if (log().isErrorEnabled()) {
                ((IterableOnceOps) writeResults.filterNot(ElasticsearchSimpleFlowStage::org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$StageLogic$$_$handleResponse$$anonfun$1)).foreach(writeResult -> {
                    if (writeResult.getError().isPresent()) {
                        log().error("Received error from elastic when attempting to index documents. Error: {}", writeResult.getError().get());
                    }
                });
            }
            emit(this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$out, writeResults.$plus$plus(seq2));
            if (isClosed(this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$in)) {
                completeStage();
            } else {
                tryPull();
            }
        }

        private void tryPull() {
            if (isClosed(this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$in) || hasBeenPulled(this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$in)) {
                return;
            }
            pull(this.$outer.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$in);
        }

        public void onUpstreamFinish() {
            if (this.inflight) {
                return;
            }
            completeStage();
        }

        public final /* synthetic */ ElasticsearchSimpleFlowStage org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$StageLogic$$$outer() {
            return this.$outer;
        }
    }

    public ElasticsearchSimpleFlowStage(ElasticsearchParams elasticsearchParams, WriteSettingsBase<?, ?> writeSettingsBase, MessageWriter<T> messageWriter, HttpExt httpExt, Materializer materializer, ExecutionContext executionContext) {
        RestBulkApi<T, C> restBulkApiV7;
        this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$elasticsearchParams = elasticsearchParams;
        this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$settings = writeSettingsBase;
        this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$http = httpExt;
        this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$mat = materializer;
        this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$ec = executionContext;
        Object apiVersion = writeSettingsBase.apiVersion();
        ApiVersion apiVersion2 = ApiVersion.V5;
        if (apiVersion2 != null ? !apiVersion2.equals(apiVersion) : apiVersion != null) {
            ApiVersion apiVersion3 = ApiVersion.V7;
            if (apiVersion3 != null ? !apiVersion3.equals(apiVersion) : apiVersion != null) {
                OpensearchApiVersion opensearchApiVersion = OpensearchApiVersion.V1;
                if (opensearchApiVersion != null ? !opensearchApiVersion.equals(apiVersion) : apiVersion != null) {
                    throw new IllegalArgumentException(new StringBuilder(29).append("API version ").append(apiVersion).append(" is not supported").toString());
                }
                restBulkApiV7 = new RestBulkApiV7<>(elasticsearchParams.indexName(), writeSettingsBase.versionType(), writeSettingsBase.allowExplicitIndex(), messageWriter);
            } else {
                restBulkApiV7 = new RestBulkApiV7<>(elasticsearchParams.indexName(), writeSettingsBase.versionType(), writeSettingsBase.allowExplicitIndex(), messageWriter);
            }
        } else {
            restBulkApiV7 = new RestBulkApiV5<>(elasticsearchParams.indexName(), (String) elasticsearchParams.typeName().get(), writeSettingsBase.versionType(), writeSettingsBase.allowExplicitIndex(), messageWriter);
        }
        this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$restApi = restBulkApiV7;
        this.org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$$baseUri = Uri$.MODULE$.apply(writeSettingsBase.connection().baseUrl());
    }

    /* renamed from: shape, reason: merged with bridge method [inline-methods] */
    public FlowShape<Tuple2<Seq<WriteMessage<T, C>>, Seq<WriteResult<T, C>>>, Seq<WriteResult<T, C>>> m23shape() {
        return this.shape;
    }

    public GraphStageLogic createLogic(Attributes attributes) {
        return new StageLogic(this);
    }

    public static final /* synthetic */ boolean org$apache$pekko$stream$connectors$elasticsearch$impl$ElasticsearchSimpleFlowStage$StageLogic$$_$handleResponse$$anonfun$1(WriteResult writeResult) {
        return writeResult.success();
    }
}
