package org.apache.bahir.cloudant.internal;

import com.google.gson.JsonParser;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;
import okhttp3.Credentials;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.bahir.cloudant.CloudantChangesConfig;
import org.apache.bahir.cloudant.CloudantChangesConfig$;
import org.apache.bahir.cloudant.common.ChangesRow;
import org.apache.bahir.cloudant.common.ChangesRowScanner;
import org.apache.bahir.cloudant.common.CloudantException;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.receiver.Receiver;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ChangesReceiver.scala */
@ScalaSignature(bytes = "\u0006\u0001a2A!\u0001\u0002\u0001\u001b\ty1\t[1oO\u0016\u001c(+Z2fSZ,'O\u0003\u0002\u0004\t\u0005A\u0011N\u001c;fe:\fGN\u0003\u0002\u0006\r\u0005A1\r\\8vI\u0006tGO\u0003\u0002\b\u0011\u0005)!-\u00195je*\u0011\u0011BC\u0001\u0007CB\f7\r[3\u000b\u0003-\t1a\u001c:h\u0007\u0001\u0019\"\u0001\u0001\b\u0011\u0007=1\u0002$D\u0001\u0011\u0015\t\t\"#\u0001\u0005sK\u000e,\u0017N^3s\u0015\t\u0019B#A\u0005tiJ,\u0017-\\5oO*\u0011Q\u0003C\u0001\u0006gB\f'o[\u0005\u0003/A\u0011\u0001BU3dK&4XM\u001d\t\u00033}q!AG\u000f\u000e\u0003mQ\u0011\u0001H\u0001\u0006g\u000e\fG.Y\u0005\u0003=m\ta\u0001\u0015:fI\u00164\u0017B\u0001\u0011\"\u0005\u0019\u0019FO]5oO*\u0011ad\u0007\u0005\tG\u0001\u0011\t\u0011)A\u0005I\u000511m\u001c8gS\u001e\u0004\"!\n\u0014\u000e\u0003\u0011I!a\n\u0003\u0003+\rcw.\u001e3b]R\u001c\u0005.\u00198hKN\u001cuN\u001c4jO\")\u0011\u0006\u0001C\u0001U\u00051A(\u001b8jiz\"\"aK\u0017\u0011\u00051\u0002Q\"\u0001\u0002\t\u000b\rB\u0003\u0019\u0001\u0013\t\u000b=\u0002A\u0011\u0001\u0019\u0002\u000f=t7\u000b^1siR\t\u0011\u0007\u0005\u0002\u001be%\u00111g\u0007\u0002\u0005+:LG\u000fC\u00036\u0001\u0011%\u0001'A\u0004sK\u000e,\u0017N^3\t\u000b]\u0002A\u0011\t\u0019\u0002\r=t7\u000b^8q\u0001")
/* loaded from: input_file:org/apache/bahir/cloudant/internal/ChangesReceiver.class */
public class ChangesReceiver extends Receiver<String> {
    private final CloudantChangesConfig config;

    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.bahir.cloudant.internal.ChangesReceiver$$anon$1] */
    public void onStart() {
        new Thread(this) { // from class: org.apache.bahir.cloudant.internal.ChangesReceiver$$anon$1
            private final /* synthetic */ ChangesReceiver $outer;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                this.$outer.org$apache$bahir$cloudant$internal$ChangesReceiver$$receive();
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super("Cloudant Receiver");
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        }.start();
    }

    public void org$apache$bahir$cloudant$internal$ChangesReceiver$$receive() {
        OkHttpClient build = new OkHttpClient.Builder().connectTimeout(5L, TimeUnit.SECONDS).readTimeout(60L, TimeUnit.SECONDS).build();
        Request.Builder url = new Request.Builder().url(this.config.getChangesReceiverUrl().toString());
        if (this.config.username() == null) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            url.header("Authorization", Credentials.basic(this.config.username(), this.config.password()));
        }
        if (this.config.getSelector() == null) {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            url.post(RequestBody.create(MediaType.parse("application/json; charset=utf-8"), new StringBuilder().append("{\"selector\":").append(this.config.getSelector()).append("}").toString()));
        }
        Response execute = build.newCall(url.build()).execute();
        int code = execute.code();
        if (code != 200) {
            String stringBuilder = new StringBuilder().append("Error retrieving _changes feed data from database '").append(this.config.getDbname()).append("' with response code ").append(BoxesRunTime.boxToInteger(code)).append(": ").append(new JsonParser().parse(execute.body().string()).toString()).toString();
            reportError(stringBuilder, new CloudantException(stringBuilder));
            CloudantChangesConfig$.MODULE$.receiverErrorMsg_$eq(stringBuilder);
            return;
        }
        InputStream byteStream = execute.body().byteStream();
        new ChangesRow();
        if (byteStream == null) {
            return;
        }
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(byteStream));
        while (true) {
            ChangesRow readRowFromReader = ChangesRowScanner.readRowFromReader(bufferedReader);
            if (BoxedUnit.UNIT == null) {
                return;
            }
            if (!isStopped() && readRowFromReader != null && !readRowFromReader.getDoc().has("_deleted")) {
                store(readRowFromReader.getDoc().toString());
            }
        }
    }

    public void onStop() {
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ChangesReceiver(CloudantChangesConfig cloudantChangesConfig) {
        super(StorageLevel$.MODULE$.MEMORY_AND_DISK());
        this.config = cloudantChangesConfig;
    }
}
