package org.apache.bahir.cloudant;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;
import okhttp3.Credentials;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.bahir.cloudant.common.ChangesRow;
import org.apache.bahir.cloudant.common.ChangesRowScanner;
import org.apache.bahir.cloudant.common.CloudantException;
import org.apache.bahir.cloudant.common.JsonStoreConfigManager$;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.receiver.Receiver;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.immutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: CloudantReceiver.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00193A!\u0001\u0002\u0001\u0017\t\u00012\t\\8vI\u0006tGOU3dK&4XM\u001d\u0006\u0003\u0007\u0011\t\u0001b\u00197pk\u0012\fg\u000e\u001e\u0006\u0003\u000b\u0019\tQAY1iSJT!a\u0002\u0005\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005I\u0011aA8sO\u000e\u00011C\u0001\u0001\r!\riACF\u0007\u0002\u001d)\u0011q\u0002E\u0001\te\u0016\u001cW-\u001b<fe*\u0011\u0011CE\u0001\ngR\u0014X-Y7j]\u001eT!a\u0005\u0004\u0002\u000bM\u0004\u0018M]6\n\u0005Uq!\u0001\u0003*fG\u0016Lg/\u001a:\u0011\u0005]\u0001cB\u0001\r\u001f!\tIB$D\u0001\u001b\u0015\tY\"\"\u0001\u0004=e>|GO\u0010\u0006\u0002;\u0005)1oY1mC&\u0011q\u0004H\u0001\u0007!J,G-\u001a4\n\u0005\u0005\u0012#AB*ue&twM\u0003\u0002 9!AA\u0005\u0001B\u0001B\u0003%Q%A\u0005ta\u0006\u00148nQ8oMB\u0011aeJ\u0007\u0002%%\u0011\u0001F\u0005\u0002\n'B\f'o[\"p]\u001aD\u0001B\u000b\u0001\u0003\u0002\u0003\u0006IaK\u0001\u000fG2|W\u000fZ1oiB\u000b'/Y7t!\u00119BF\u0006\f\n\u00055\u0012#aA'ba\")q\u0006\u0001C\u0001a\u00051A(\u001b8jiz\"2!M\u001a5!\t\u0011\u0004!D\u0001\u0003\u0011\u0015!c\u00061\u0001&\u0011\u0015Qc\u00061\u0001,\u0011!1\u0004\u0001#b\u0001\n\u00039\u0014AB2p]\u001aLw-F\u00019!\t\u0011\u0014(\u0003\u0002;\u0005\t)2\t\\8vI\u0006tGo\u00115b]\u001e,7oQ8oM&<\u0007\"\u0002\u001f\u0001\t\u0003i\u0014aB8o'R\f'\u000f\u001e\u000b\u0002}A\u0011q\bQ\u0007\u00029%\u0011\u0011\t\b\u0002\u0005+:LG\u000fC\u0003D\u0001\u0011%Q(A\u0004sK\u000e,\u0017N^3\t\u000b\u0015\u0003A\u0011A\u001f\u0002\r=t7\u000b^8q\u0001")
/* loaded from: input_file:org/apache/bahir/cloudant/CloudantReceiver.class */
public class CloudantReceiver extends Receiver<String> {
    private CloudantChangesConfig config;
    private final SparkConf sparkConf;
    private final Map<String, String> cloudantParams;
    private volatile boolean bitmap$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.bahir.cloudant.CloudantReceiver] */
    private CloudantChangesConfig config$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.config = (CloudantChangesConfig) JsonStoreConfigManager$.MODULE$.getConfig(this.sparkConf, this.cloudantParams.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("cloudant.endpoint"), JsonStoreConfigManager$.MODULE$.CHANGES_INDEX())));
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        this.sparkConf = null;
        this.cloudantParams = null;
        return this.config;
    }

    public CloudantChangesConfig config() {
        return !this.bitmap$0 ? config$lzycompute() : this.config;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.bahir.cloudant.CloudantReceiver$$anon$1] */
    public void onStart() {
        new Thread(this) { // from class: org.apache.bahir.cloudant.CloudantReceiver$$anon$1
            private final /* synthetic */ CloudantReceiver $outer;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                this.$outer.org$apache$bahir$cloudant$CloudantReceiver$$receive();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super("Cloudant Receiver");
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }.start();
    }

    public void org$apache$bahir$cloudant$CloudantReceiver$$receive() {
        OkHttpClient build = new OkHttpClient.Builder().connectTimeout(5L, TimeUnit.SECONDS).readTimeout(60L, TimeUnit.SECONDS).build();
        Request.Builder url = new Request.Builder().url(config().getChangesReceiverUrl().toString());
        if (config().username() != null) {
            url.header("Authorization", Credentials.basic(config().username(), config().password()));
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        if (config().getSelector() != null) {
            url.post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), new StringBuilder(13).append("{\"selector\":").append(config().getSelector()).append("}").toString()));
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Response execute = build.newCall(url.build()).execute();
        int code = execute.code();
        if (code != 200) {
            String sb = new StringBuilder(33).append("Error retrieving _changes feed ").append(config().getDbname()).append(": ").append(code).toString();
            reportError(sb, new CloudantException(sb));
            return;
        }
        InputStream byteStream = execute.body().byteStream();
        new ChangesRow();
        if (byteStream == null) {
            return;
        }
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(byteStream));
        while (true) {
            ChangesRow readRowFromReader = ChangesRowScanner.readRowFromReader(bufferedReader);
            if (BoxedUnit.UNIT == null) {
                return;
            }
            if (!isStopped() && readRowFromReader != null && !readRowFromReader.getDoc().has("_deleted")) {
                store(readRowFromReader.getDoc().toString());
            }
        }
    }

    public void onStop() {
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public CloudantReceiver(SparkConf sparkConf, Map<String, String> map) {
        super(StorageLevel$.MODULE$.MEMORY_AND_DISK());
        this.sparkConf = sparkConf;
        this.cloudantParams = map;
    }
}
