package akka.stream.alpakka.elasticsearch;

import akka.NotUsed$;
import akka.stream.alpakka.elasticsearch.ElasticsearchFlowStage;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogic;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.http.Header;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Vector;
import scala.collection.immutable.Vector$;
import scala.collection.mutable.Queue;
import scala.concurrent.Future$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.RichInt$;
import spray.json.JsArray;

/* compiled from: ElasticsearchFlowStage.scala */
/* loaded from: input_file:akka/stream/alpakka/elasticsearch/ElasticsearchFlowStage$$anon$1.class */
public final class ElasticsearchFlowStage$$anon$1 extends TimerGraphStageLogic implements InHandler, OutHandler {
    private ElasticsearchFlowStage.State state;
    private final Queue<IncomingMessage<T>> akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$queue;
    private final AsyncCallback<Tuple2<Seq<IncomingMessage<T>>, Throwable>> akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$failureHandler;
    private final AsyncCallback<Tuple2<Seq<IncomingMessage<T>>, Response>> akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$responseHandler;
    private Seq<IncomingMessage<T>> failedMessages;
    private int retryCount;
    private final /* synthetic */ ElasticsearchFlowStage $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.class.onDownstreamFinish(this);
    }

    private ElasticsearchFlowStage.State state() {
        return this.state;
    }

    private void state_$eq(ElasticsearchFlowStage.State state) {
        this.state = state;
    }

    public Queue<IncomingMessage<T>> akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$queue() {
        return this.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$queue;
    }

    public AsyncCallback<Tuple2<Seq<IncomingMessage<T>>, Throwable>> akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$failureHandler() {
        return this.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$failureHandler;
    }

    public AsyncCallback<Tuple2<Seq<IncomingMessage<T>>, Response>> akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$responseHandler() {
        return this.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$responseHandler;
    }

    private Seq<IncomingMessage<T>> failedMessages() {
        return this.failedMessages;
    }

    private void failedMessages_$eq(Seq<IncomingMessage<T>> seq) {
        this.failedMessages = seq;
    }

    private int retryCount() {
        return this.retryCount;
    }

    private void retryCount_$eq(int i) {
        this.retryCount = i;
    }

    public void preStart() {
        pull(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$in());
    }

    private void tryPull() {
        if (akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$queue().size() >= this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$settings.bufferSize() || isClosed(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$in()) || hasBeenPulled(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$in())) {
            return;
        }
        pull(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$in());
    }

    public void onTimer(Object obj) {
        sendBulkUpdateRequest(failedMessages());
        failedMessages_$eq(Nil$.MODULE$);
    }

    public void akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$handleFailure(Tuple2<Seq<IncomingMessage<T>>, Throwable> tuple2) {
        if (tuple2 == 0) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Seq) tuple2._1(), (Throwable) tuple2._2());
        Seq seq = (Seq) tuple22._1();
        Throwable th = (Throwable) tuple22._2();
        if (retryCount() >= this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$settings.maxRetry()) {
            failStage(th);
            return;
        }
        retryCount_$eq(retryCount() + 1);
        failedMessages_$eq(seq);
        scheduleOnce(NotUsed$.MODULE$, new package.DurationInt(package$.MODULE$.DurationInt(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$settings.retryInterval())).millis());
    }

    private void handleSuccess() {
        completeStage();
    }

    public void akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$handleResponse(Tuple2<Seq<IncomingMessage<T>>, Response> tuple2) {
        if (tuple2 == 0) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((Seq) tuple2._1(), (Response) tuple2._2());
        Vector vector = (Vector) ((TraversableLike) ((JsArray) spray.json.package$.MODULE$.pimpString(EntityUtils.toString(((Response) tuple22._2()).getEntity())).parseJson().asJsObject().fields().apply("items")).elements().zip((Seq) tuple22._1(), Vector$.MODULE$.canBuildFrom())).flatMap(new ElasticsearchFlowStage$$anon$1$$anonfun$3(this), Vector$.MODULE$.canBuildFrom());
        if (vector.nonEmpty() && this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$settings.retryPartialFailure() && retryCount() < this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$settings.maxRetry()) {
            retryCount_$eq(retryCount() + 1);
            failedMessages_$eq(vector);
            scheduleOnce(NotUsed$.MODULE$, new package.DurationInt(package$.MODULE$.DurationInt(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$settings.retryInterval())).millis());
            return;
        }
        retryCount_$eq(0);
        push(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$out(), Future$.MODULE$.successful(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$pusher.apply(vector)));
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$settings.bufferSize()).flatMap(new ElasticsearchFlowStage$$anon$1$$anonfun$4(this), IndexedSeq$.MODULE$.canBuildFrom());
        if (!indexedSeq.isEmpty()) {
            sendBulkUpdateRequest(indexedSeq);
            return;
        }
        if (ElasticsearchFlowStage$Finished$.MODULE$.equals(state())) {
            handleSuccess();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            state_$eq(ElasticsearchFlowStage$Idle$.MODULE$);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    private void sendBulkUpdateRequest(final Seq<IncomingMessage<T>> seq) {
        this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$client.performRequestAsync("POST", "/_bulk", (Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Nil$.MODULE$)).asJava(), new StringEntity(((TraversableOnce) seq.map(new ElasticsearchFlowStage$$anon$1$$anonfun$5(this), Seq$.MODULE$.canBuildFrom())).mkString("", "\n", "\n"), StandardCharsets.UTF_8), new ResponseListener(this, seq) { // from class: akka.stream.alpakka.elasticsearch.ElasticsearchFlowStage$$anon$1$$anon$2
            private final /* synthetic */ ElasticsearchFlowStage$$anon$1 $outer;
            private final Seq messages$1;

            public void onFailure(Exception exc) {
                this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$failureHandler().invoke(new Tuple2(this.messages$1, exc));
            }

            public void onSuccess(Response response) {
                this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$responseHandler().invoke(new Tuple2(this.messages$1, response));
            }

            /* JADX WARN: Incorrect inner types in method signature: (Lakka/stream/alpakka/elasticsearch/ElasticsearchFlowStage<TT;TR;>.$anon$1;)V */
            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.messages$1 = seq;
            }
        }, new Header[]{new BasicHeader("Content-Type", "application/x-ndjson")});
    }

    public void onPull() {
        tryPull();
    }

    public void onPush() {
        akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$queue().enqueue(Predef$.MODULE$.wrapRefArray(new IncomingMessage[]{(IncomingMessage) grab(this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$in())}));
        if (ElasticsearchFlowStage$Idle$.MODULE$.equals(state())) {
            state_$eq(ElasticsearchFlowStage$Sending$.MODULE$);
            sendBulkUpdateRequest((IndexedSeq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), this.$outer.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$settings.bufferSize()).flatMap(new ElasticsearchFlowStage$$anon$1$$anonfun$6(this), IndexedSeq$.MODULE$.canBuildFrom()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        tryPull();
    }

    public void onUpstreamFailure(Throwable th) {
        failStage(th);
    }

    public void onUpstreamFinish() {
        ElasticsearchFlowStage.State state = state();
        if (ElasticsearchFlowStage$Idle$.MODULE$.equals(state)) {
            handleSuccess();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (ElasticsearchFlowStage$Sending$.MODULE$.equals(state)) {
            state_$eq(ElasticsearchFlowStage$Finished$.MODULE$);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            if (!ElasticsearchFlowStage$Finished$.MODULE$.equals(state)) {
                throw new MatchError(state);
            }
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public /* synthetic */ ElasticsearchFlowStage akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ElasticsearchFlowStage$$anon$1(ElasticsearchFlowStage<T, R> elasticsearchFlowStage) {
        super(elasticsearchFlowStage.m0shape());
        if (elasticsearchFlowStage == 0) {
            throw null;
        }
        this.$outer = elasticsearchFlowStage;
        InHandler.class.$init$(this);
        OutHandler.class.$init$(this);
        this.state = ElasticsearchFlowStage$Idle$.MODULE$;
        this.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$queue = new Queue<>();
        this.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$failureHandler = getAsyncCallback(new ElasticsearchFlowStage$$anon$1$$anonfun$1(this));
        this.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$anon$$responseHandler = getAsyncCallback(new ElasticsearchFlowStage$$anon$1$$anonfun$2(this));
        this.failedMessages = Nil$.MODULE$;
        this.retryCount = 0;
        setHandlers(elasticsearchFlowStage.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$in(), elasticsearchFlowStage.akka$stream$alpakka$elasticsearch$ElasticsearchFlowStage$$out(), this);
    }
}
