package akka.stream.alpakka.elasticsearch;

import akka.event.LoggingAdapter;
import akka.stream.Outlet;
import akka.stream.SourceShape;
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSourceSettings;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import java.io.ByteArrayOutputStream;
import org.apache.http.Header;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import spray.json.DefaultJsonProtocol$;
import spray.json.package$;

/* compiled from: ElasticsearchSourceStage.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015g\u0001B\u0001\u0003!-\u0011\u0001$\u00127bgRL7m]3be\u000eD7k\\;sG\u0016dunZ5d\u0015\t\u0019A!A\u0007fY\u0006\u001cH/[2tK\u0006\u00148\r\u001b\u0006\u0003\u000b\u0019\tq!\u00197qC.\\\u0017M\u0003\u0002\b\u0011\u000511\u000f\u001e:fC6T\u0011!C\u0001\u0005C.\\\u0017m\u0001\u0001\u0016\u00051Y6#\u0002\u0001\u000e'qy\u0002C\u0001\b\u0012\u001b\u0005y!B\u0001\t\u0007\u0003\u0015\u0019H/Y4f\u0013\t\u0011rBA\bHe\u0006\u0004\bn\u0015;bO\u0016dunZ5d!\t!\"$D\u0001\u0016\u0015\t1r#\u0001\u0004dY&,g\u000e\u001e\u0006\u0003\u0007aQ\u0011!G\u0001\u0004_J<\u0017BA\u000e\u0016\u0005A\u0011Vm\u001d9p]N,G*[:uK:,'\u000f\u0005\u0002\u000f;%\u0011ad\u0004\u0002\u000b\u001fV$\b*\u00198eY\u0016\u0014\bC\u0001\b!\u0013\t\tsB\u0001\u0007Ti\u0006<W\rT8hO&tw\r\u0003\u0005$\u0001\t\u0005\t\u0015!\u0003%\u0003%Ig\u000eZ3y\u001d\u0006lW\r\u0005\u0002&W9\u0011a%K\u0007\u0002O)\t\u0001&A\u0003tG\u0006d\u0017-\u0003\u0002+O\u00051\u0001K]3eK\u001aL!\u0001L\u0017\u0003\rM#(/\u001b8h\u0015\tQs\u0005\u0003\u00050\u0001\t\u0005\t\u0015!\u00031\u0003!!\u0018\u0010]3OC6,\u0007c\u0001\u00142I%\u0011!g\n\u0002\u0007\u001fB$\u0018n\u001c8\t\u0011Q\u0002!\u00111A\u0005\u0002U\nAb]3be\u000eD\u0007+\u0019:b[N,\u0012A\u000e\t\u0005K]\"C%\u0003\u00029[\t\u0019Q*\u00199\t\u0011i\u0002!\u00111A\u0005\u0002m\n\u0001c]3be\u000eD\u0007+\u0019:b[N|F%Z9\u0015\u0005qz\u0004C\u0001\u0014>\u0013\tqtE\u0001\u0003V]&$\bb\u0002!:\u0003\u0003\u0005\rAN\u0001\u0004q\u0012\n\u0004\u0002\u0003\"\u0001\u0005\u0003\u0005\u000b\u0015\u0002\u001c\u0002\u001bM,\u0017M]2i!\u0006\u0014\u0018-\\:!\u0011!1\u0002A!A!\u0002\u0013!\u0005C\u0001\u000bF\u0013\t1UC\u0001\u0006SKN$8\t\\5f]RD\u0001\u0002\u0013\u0001\u0003\u0002\u0003\u0006I!S\u0001\tg\u0016$H/\u001b8hgB\u0011!*T\u0007\u0002\u0017*\u0011AJA\u0001\tg\u000e\fG.\u00193tY&\u0011aj\u0013\u0002\u001c\u000b2\f7\u000f^5dg\u0016\f'o\u00195T_V\u00148-Z*fiRLgnZ:\t\u0011A\u0003!\u0011!Q\u0001\nE\u000b1a\\;u!\r\u00116+V\u0007\u0002\r%\u0011AK\u0002\u0002\u0007\u001fV$H.\u001a;\u0011\u0007Y;\u0016,D\u0001\u0003\u0013\tA&AA\bPkR<w.\u001b8h\u001b\u0016\u001c8/Y4f!\tQ6\f\u0004\u0001\u0005\u000bq\u0003!\u0019A/\u0003\u0003Q\u000b\"AX1\u0011\u0005\u0019z\u0016B\u00011(\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"A\n2\n\u0005\r<#aA!os\"AQ\r\u0001B\u0001B\u0003%a-A\u0003tQ\u0006\u0004X\rE\u0002SOVK!\u0001\u001b\u0004\u0003\u0017M{WO]2f'\"\f\u0007/\u001a\u0005\tU\u0002\u0011\t\u0011)A\u0005W\u00061!/Z1eKJ\u00042A\u00167Z\u0013\ti'AA\u0007NKN\u001c\u0018mZ3SK\u0006$WM\u001d\u0005\u0006_\u0002!\t\u0001]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0013E\u00148\u000f^;wobL\bc\u0001,\u00013\")1E\u001ca\u0001I!)qF\u001ca\u0001a!)AG\u001ca\u0001m!)aC\u001ca\u0001\t\")\u0001J\u001ca\u0001\u0013\")\u0001K\u001ca\u0001#\")QM\u001ca\u0001M\")!N\u001ca\u0001W\"91\u0010\u0001a\u0001\n\u0013a\u0018\u0001C:de>dG.\u00133\u0016\u0003\u0011BqA \u0001A\u0002\u0013%q0\u0001\u0007tGJ|G\u000e\\%e?\u0012*\u0017\u000fF\u0002=\u0003\u0003Aq\u0001Q?\u0002\u0002\u0003\u0007A\u0005C\u0004\u0002\u0006\u0001\u0001\u000b\u0015\u0002\u0013\u0002\u0013M\u001c'o\u001c7m\u0013\u0012\u0004\u0003\"CA\u0005\u0001\t\u0007I\u0011BA\u0006\u0003=\u0011Xm\u001d9p]N,\u0007*\u00198eY\u0016\u0014XCAA\u0007!\u0015q\u0011qBA\n\u0013\r\t\tb\u0004\u0002\u000e\u0003NLhnY\"bY2\u0014\u0017mY6\u0011\u0007Q\t)\"C\u0002\u0002\u0018U\u0011\u0001BU3ta>t7/\u001a\u0005\t\u00037\u0001\u0001\u0015!\u0003\u0002\u000e\u0005\u0001\"/Z:q_:\u001cX\rS1oI2,'\u000f\t\u0005\n\u0003?\u0001!\u0019!C\u0005\u0003C\taBZ1jYV\u0014X\rS1oI2,'/\u0006\u0002\u0002$A)a\"a\u0004\u0002&A!\u0011qEA\u001c\u001d\u0011\tI#a\r\u000f\t\u0005-\u0012\u0011G\u0007\u0003\u0003[Q1!a\f\u000b\u0003\u0019a$o\\8u}%\t\u0001&C\u0002\u00026\u001d\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002:\u0005m\"!\u0003+ie><\u0018M\u00197f\u0015\r\t)d\n\u0005\t\u0003\u007f\u0001\u0001\u0015!\u0003\u0002$\u0005ya-Y5mkJ,\u0007*\u00198eY\u0016\u0014\b\u0005C\u0005\u0002D\u0001\u0001\r\u0011\"\u0003\u0002F\u0005)r/Y5uS:<gi\u001c:FY\u0006\u001cH/[2ECR\fWCAA$!\r1\u0013\u0011J\u0005\u0004\u0003\u0017:#a\u0002\"p_2,\u0017M\u001c\u0005\n\u0003\u001f\u0002\u0001\u0019!C\u0005\u0003#\n\u0011d^1ji&twMR8s\u000b2\f7\u000f^5d\t\u0006$\u0018m\u0018\u0013fcR\u0019A(a\u0015\t\u0013\u0001\u000bi%!AA\u0002\u0005\u001d\u0003\u0002CA,\u0001\u0001\u0006K!a\u0012\u0002-]\f\u0017\u000e^5oO\u001a{'/\u00127bgRL7\rR1uC\u0002B\u0011\"a\u0017\u0001\u0001\u0004%I!!\u0012\u0002)A,H\u000e\\%t/\u0006LG/\u001b8h\r>\u0014H)\u0019;b\u0011%\ty\u0006\u0001a\u0001\n\u0013\t\t'\u0001\rqk2d\u0017j],bSRLgn\u001a$pe\u0012\u000bG/Y0%KF$2\u0001PA2\u0011%\u0001\u0015QLA\u0001\u0002\u0004\t9\u0005\u0003\u0005\u0002h\u0001\u0001\u000b\u0015BA$\u0003U\u0001X\u000f\u001c7Jg^\u000b\u0017\u000e^5oO\u001a{'\u000fR1uC\u0002B\u0011\"a\u001b\u0001\u0001\u0004%I!!\u001c\u0002\u0013\u0011\fG/\u0019*fC\u0012LXCAA8!\u00111\u0013'!\u001d\u0011\tY\u000b\u0019(W\u0005\u0004\u0003k\u0012!AD*de>dGNU3ta>t7/\u001a\u0005\n\u0003s\u0002\u0001\u0019!C\u0005\u0003w\nQ\u0002Z1uCJ+\u0017\rZ=`I\u0015\fHc\u0001\u001f\u0002~!I\u0001)a\u001e\u0002\u0002\u0003\u0007\u0011q\u000e\u0005\t\u0003\u0003\u0003\u0001\u0015)\u0003\u0002p\u0005QA-\u0019;b%\u0016\fG-\u001f\u0011\t\u000f\u0005\u0015\u0005\u0001\"\u0001\u0002\b\u0006)2/\u001a8e'\u000e\u0014x\u000e\u001c7TG\u0006t'+Z9vKN$H#\u0001\u001f\t\u000f\u0005-\u0005\u0001\"\u0011\u0002\u000e\u0006IqN\u001c$bS2,(/\u001a\u000b\u0004y\u0005=\u0005\u0002CAI\u0003\u0013\u0003\r!a%\u0002\u0013\u0015D8-\u001a9uS>t\u0007\u0003BA\u0014\u0003+KA!a&\u0002<\tIQ\t_2faRLwN\u001c\u0005\b\u00037\u0003A\u0011IAO\u0003%ygnU;dG\u0016\u001c8\u000fF\u0002=\u0003?C\u0001\"!)\u0002\u001a\u0002\u0007\u00111C\u0001\te\u0016\u001c\bo\u001c8tK\"9\u0011Q\u0015\u0001\u0005\u0002\u0005\u001d\u0016!\u00045b]\u0012dWMR1jYV\u0014X\rF\u0002=\u0003SC\u0001\"a+\u0002$\u0002\u0007\u0011QE\u0001\u0003KbDq!a,\u0001\t\u0003\t\t,\u0001\biC:$G.\u001a*fgB|gn]3\u0015\u0007q\n\u0019\f\u0003\u0005\u00026\u00065\u0006\u0019AA\n\u0003\r\u0011Xm\u001d\u0005\b\u0003s\u0003A\u0011AA^\u0003QA\u0017M\u001c3mKN\u001b'o\u001c7m%\u0016\u001c\bo\u001c8tKR!\u0011qIA_\u0011!\ty,a.A\u0002\u0005E\u0014AD:de>dGNU3ta>t7/\u001a\u0005\b\u0003\u0007\u0004A\u0011IAD\u0003\u0019yg\u000eU;mY\u0002")
/* loaded from: input_file:akka/stream/alpakka/elasticsearch/ElasticsearchSourceLogic.class */
public class ElasticsearchSourceLogic<T> extends GraphStageLogic implements ResponseListener, OutHandler, StageLogging {
    private final String indexName;
    private final Option<String> typeName;
    private Map<String, String> searchParams;
    private final RestClient client;
    private final ElasticsearchSourceSettings settings;
    private final Outlet<OutgoingMessage<T>> out;
    private final MessageReader<T> reader;
    private String scrollId;
    private final AsyncCallback<Response> responseHandler;
    private final AsyncCallback<Throwable> failureHandler;
    private boolean waitingForElasticData;
    private boolean pullIsWaitingForData;
    private Option<ScrollResponse<T>> dataReady;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public Class<?> logSource() {
        return StageLogging.class.logSource(this);
    }

    public LoggingAdapter log() {
        return StageLogging.class.log(this);
    }

    public void onDownstreamFinish() throws Exception {
        OutHandler.class.onDownstreamFinish(this);
    }

    public Map<String, String> searchParams() {
        return this.searchParams;
    }

    public void searchParams_$eq(Map<String, String> map) {
        this.searchParams = map;
    }

    private String scrollId() {
        return this.scrollId;
    }

    private void scrollId_$eq(String str) {
        this.scrollId = str;
    }

    private AsyncCallback<Response> responseHandler() {
        return this.responseHandler;
    }

    private AsyncCallback<Throwable> failureHandler() {
        return this.failureHandler;
    }

    private boolean waitingForElasticData() {
        return this.waitingForElasticData;
    }

    private void waitingForElasticData_$eq(boolean z) {
        this.waitingForElasticData = z;
    }

    private boolean pullIsWaitingForData() {
        return this.pullIsWaitingForData;
    }

    private void pullIsWaitingForData_$eq(boolean z) {
        this.pullIsWaitingForData = z;
    }

    private Option<ScrollResponse<T>> dataReady() {
        return this.dataReady;
    }

    private void dataReady_$eq(Option<ScrollResponse<T>> option) {
        this.dataReady = option;
    }

    public void sendScrollScanRequest() {
        String s;
        try {
            waitingForElasticData_$eq(true);
            if (scrollId() != null) {
                log().debug("Fetching next scroll");
                this.client.performRequestAsync("POST", new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"/_search/scroll"})).s(Nil$.MODULE$), (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Nil$.MODULE$)).asJava(), new StringEntity(package$.MODULE$.pimpAny(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("scroll"), "5m"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("scroll_id"), scrollId())}))).toJson(DefaultJsonProtocol$.MODULE$.mapFormat(DefaultJsonProtocol$.MODULE$.StringJsonFormat(), DefaultJsonProtocol$.MODULE$.StringJsonFormat())).toString()), this, new Header[]{new BasicHeader("Content-Type", "application/json")});
                return;
            }
            log().debug("Doing initial search");
            if (!searchParams().contains("size")) {
                searchParams_$eq(searchParams().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("size"), BoxesRunTime.boxToInteger(this.settings.bufferSize()).toString())));
            }
            if (!searchParams().contains("version") && this.settings.includeDocumentVersion()) {
                searchParams_$eq(searchParams().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("version"), "true")));
            }
            String stringBuilder = new StringBuilder().append("{").append(((TraversableOnce) searchParams().map(new ElasticsearchSourceLogic$$anonfun$3(this), Iterable$.MODULE$.canBuildFrom())).mkString(",")).append("}").toString();
            Tuple2 tuple2 = new Tuple2(this.indexName, this.typeName);
            if (tuple2 != null) {
                String str = (String) tuple2._1();
                Some some = (Option) tuple2._2();
                if (some instanceof Some) {
                    s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"/", "/", "/_search"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, (String) some.x()}));
                    this.client.performRequestAsync("POST", s, (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("scroll"), "5m"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sort"), "_doc")}))).asJava(), new StringEntity(stringBuilder), this, new Header[]{new BasicHeader("Content-Type", "application/json")});
                    return;
                }
            }
            if (tuple2 != null) {
                String str2 = (String) tuple2._1();
                if (None$.MODULE$.equals((Option) tuple2._2())) {
                    s = new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"/", "/_search"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2}));
                    this.client.performRequestAsync("POST", s, (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("scroll"), "5m"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("sort"), "_doc")}))).asJava(), new StringEntity(stringBuilder), this, new Header[]{new BasicHeader("Content-Type", "application/json")});
                    return;
                }
            }
            throw new MatchError(tuple2);
        } catch (Exception e) {
            handleFailure(e);
        }
    }

    public void onFailure(Exception exc) {
        failureHandler().invoke(exc);
    }

    public void onSuccess(Response response) {
        responseHandler().invoke(response);
    }

    public void handleFailure(Throwable th) {
        waitingForElasticData_$eq(false);
        failStage(th);
    }

    public void handleResponse(Response response) {
        waitingForElasticData_$eq(false);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            response.getEntity().writeTo(byteArrayOutputStream);
            String str = new String(byteArrayOutputStream.toByteArray(), "UTF-8");
            byteArrayOutputStream.close();
            ScrollResponse<T> convert = this.reader.convert(str);
            if (!pullIsWaitingForData()) {
                log().debug("Received data from elastic. Downstream have not yet asked for it");
                dataReady_$eq(new Some(convert));
                return;
            }
            log().debug("Received data from elastic. Downstream has already called pull and is waiting for data");
            pullIsWaitingForData_$eq(false);
            if (handleScrollResponse(convert)) {
                sendScrollScanRequest();
            }
        } catch (Throwable th) {
            byteArrayOutputStream.close();
            throw th;
        }
    }

    public boolean handleScrollResponse(ScrollResponse<T> scrollResponse) {
        boolean z;
        if (scrollResponse != null) {
            Some error = scrollResponse.error();
            if (error instanceof Some) {
                failStage(new IllegalStateException((String) error.x()));
                z = false;
                return z;
            }
        }
        if (scrollResponse != null) {
            Option<String> error2 = scrollResponse.error();
            Some result = scrollResponse.result();
            if (None$.MODULE$.equals(error2) && (result instanceof Some) && ((ScrollResult) result.x()).messages().isEmpty()) {
                completeStage();
                z = false;
                return z;
            }
        }
        if (scrollResponse != null) {
            Some result2 = scrollResponse.result();
            if (result2 instanceof Some) {
                ScrollResult scrollResult = (ScrollResult) result2.x();
                scrollId_$eq(scrollResult.scrollId());
                log().debug("Pushing data downstream");
                emitMultiple(this.out, scrollResult.messages().toIterator());
                z = true;
                return z;
            }
        }
        throw new MatchError(scrollResponse);
    }

    public void onPull() {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        Some dataReady = dataReady();
        if (dataReady instanceof Some) {
            ScrollResponse<T> scrollResponse = (ScrollResponse) dataReady.x();
            log().debug("Downstream is pulling data and we already have data ready");
            if (handleScrollResponse(scrollResponse)) {
                dataReady_$eq(None$.MODULE$);
                if (waitingForElasticData()) {
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    sendScrollScanRequest();
                    boxedUnit2 = BoxedUnit.UNIT;
                }
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            return;
        }
        if (!None$.MODULE$.equals(dataReady)) {
            throw new MatchError(dataReady);
        }
        if (pullIsWaitingForData()) {
            throw new Exception("This should not happen: Downstream is pulling more than once");
        }
        pullIsWaitingForData_$eq(true);
        if (waitingForElasticData()) {
            log().debug("Downstream is pulling data. Already waiting for data");
            boxedUnit = BoxedUnit.UNIT;
        } else {
            log().debug("Downstream is pulling data. We must go and get it");
            sendScrollScanRequest();
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ElasticsearchSourceLogic(String str, Option<String> option, Map<String, String> map, RestClient restClient, ElasticsearchSourceSettings elasticsearchSourceSettings, Outlet<OutgoingMessage<T>> outlet, SourceShape<OutgoingMessage<T>> sourceShape, MessageReader<T> messageReader) {
        super(sourceShape);
        this.indexName = str;
        this.typeName = option;
        this.searchParams = map;
        this.client = restClient;
        this.settings = elasticsearchSourceSettings;
        this.out = outlet;
        this.reader = messageReader;
        OutHandler.class.$init$(this);
        StageLogging.class.$init$(this);
        this.scrollId = null;
        this.responseHandler = getAsyncCallback(new ElasticsearchSourceLogic$$anonfun$1(this));
        this.failureHandler = getAsyncCallback(new ElasticsearchSourceLogic$$anonfun$2(this));
        this.waitingForElasticData = false;
        this.pullIsWaitingForData = false;
        this.dataReady = None$.MODULE$;
        setHandler(outlet, this);
    }
}
