package org.apache.bahir.cloudant.internal;

import com.google.gson.JsonParser;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;
import okhttp3.Credentials;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.bahir.cloudant.CloudantChangesConfig;
import org.apache.bahir.cloudant.CloudantChangesConfig$;
import org.apache.bahir.cloudant.common.ChangesRow;
import org.apache.bahir.cloudant.common.ChangesRowScanner;
import org.apache.bahir.cloudant.common.CloudantException;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.receiver.Receiver;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: ChangesReceiver.scala */
@ScalaSignature(bytes = "\u0006\u0001q2A!\u0001\u0002\u0001\u001b\ty1\t[1oO\u0016\u001c(+Z2fSZ,'O\u0003\u0002\u0004\t\u0005A\u0011N\u001c;fe:\fGN\u0003\u0002\u0006\r\u0005A1\r\\8vI\u0006tGO\u0003\u0002\b\u0011\u0005)!-\u00195je*\u0011\u0011BC\u0001\u0007CB\f7\r[3\u000b\u0003-\t1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\b\u0011\u0007=1\u0002$D\u0001\u0011\u0015\t\t\"#\u0001\u0005sK\u000e,\u0017N^3s\u0015\t\u0019B#A\u0005tiJ,\u0017-\\5oO*\u0011Q\u0003C\u0001\u0006gB\f'o[\u0005\u0003/A\u0011\u0001BU3dK&4XM\u001d\t\u00033\tr!A\u0007\u0011\u0011\u0005mqR\"\u0001\u000f\u000b\u0005ua\u0011A\u0002\u001fs_>$hHC\u0001 \u0003\u0015\u00198-\u00197b\u0013\t\tc$\u0001\u0004Qe\u0016$WMZ\u0005\u0003G\u0011\u0012aa\u0015;sS:<'BA\u0011\u001f\u0011!1\u0003A!A!\u0002\u00139\u0013AB2p]\u001aLw\r\u0005\u0002)S5\tA!\u0003\u0002+\t\t)2\t\\8vI\u0006tGo\u00115b]\u001e,7oQ8oM&<\u0007\"\u0002\u0017\u0001\t\u0003i\u0013A\u0002\u001fj]&$h\b\u0006\u0002/aA\u0011q\u0006A\u0007\u0002\u0005!)ae\u000ba\u0001O!)!\u0007\u0001C\u0001g\u00059qN\\*uCJ$H#\u0001\u001b\u0011\u0005U2T\"\u0001\u0010\n\u0005]r\"\u0001B+oSRDQ!\u000f\u0001\u0005\nM\nqA]3dK&4X\rC\u0003<\u0001\u0011\u00053'\u0001\u0004p]N#x\u000e\u001d")
/* loaded from: input_file:org/apache/bahir/cloudant/internal/ChangesReceiver.class */
public class ChangesReceiver extends Receiver<String> {
    private final CloudantChangesConfig config;

    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.bahir.cloudant.internal.ChangesReceiver$$anon$1] */
    public void onStart() {
        new Thread(this) { // from class: org.apache.bahir.cloudant.internal.ChangesReceiver$$anon$1
            private final /* synthetic */ ChangesReceiver $outer;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                this.$outer.org$apache$bahir$cloudant$internal$ChangesReceiver$$receive();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super("Cloudant Receiver");
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }.start();
    }

    public void org$apache$bahir$cloudant$internal$ChangesReceiver$$receive() {
        OkHttpClient build = new OkHttpClient.Builder().connectTimeout(5L, TimeUnit.SECONDS).readTimeout(60L, TimeUnit.SECONDS).build();
        Request.Builder url = new Request.Builder().url(this.config.getChangesReceiverUrl().toString());
        if (this.config.username() != null) {
            url.header("Authorization", Credentials.basic(this.config.username(), this.config.password()));
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        if (this.config.getSelector() != null) {
            url.post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), new StringBuilder(13).append("{\"selector\":").append(this.config.getSelector()).append("}").toString()));
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Response execute = build.newCall(url.build()).execute();
        int code = execute.code();
        if (code != 200) {
            String sb = new StringBuilder(74).append("Error retrieving _changes feed data from database '").append(this.config.getDbname()).append("' with response code ").append(code).append(": ").append(new JsonParser().parse(execute.body().string()).toString()).toString();
            reportError(sb, new CloudantException(sb));
            CloudantChangesConfig$.MODULE$.receiverErrorMsg_$eq(sb);
            return;
        }
        InputStream byteStream = execute.body().byteStream();
        new ChangesRow();
        if (byteStream == null) {
            return;
        }
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(byteStream));
        while (true) {
            ChangesRow readRowFromReader = ChangesRowScanner.readRowFromReader(bufferedReader);
            if (BoxedUnit.UNIT == null) {
                return;
            }
            if (!isStopped() && readRowFromReader != null && !readRowFromReader.getDoc().has("_deleted")) {
                store(readRowFromReader.getDoc().toString());
            }
        }
    }

    public void onStop() {
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ChangesReceiver(CloudantChangesConfig cloudantChangesConfig) {
        super(StorageLevel$.MODULE$.MEMORY_AND_DISK());
        this.config = cloudantChangesConfig;
    }
}
