package org.apache.bahir.sql.streaming.mqtt;

import java.util.List;
import org.apache.bahir.utils.Logging;
import org.apache.spark.SparkEnv$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.execution.streaming.sources.PackedRowWriterFactory$;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.types.StructType;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Array$;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple6;
import scala.collection.GenTraversableOnce;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.HashMap$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: MQTTStreamSink.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005mb\u0001B\u0001\u0003\u0001=\u0011\u0001#T)U)N#(/Z1n/JLG/\u001a:\u000b\u0005\r!\u0011\u0001B7riRT!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u0013)\tQAY1iSJT!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sO\u000e\u00011\u0003\u0002\u0001\u00111\u0019\u0002\"!\u0005\f\u000e\u0003IQ!a\u0005\u000b\u0002\t1\fgn\u001a\u0006\u0002+\u0005!!.\u0019<b\u0013\t9\"C\u0001\u0004PE*,7\r\u001e\t\u00033\u0011j\u0011A\u0007\u0006\u0003\u000bmQ!\u0001H\u000f\u0002\r]\u0014\u0018\u000e^3s\u0015\tqr$\u0001\u0002we)\u0011\u0001%I\u0001\bg>,(oY3t\u0015\t9!E\u0003\u0002$\u0015\u0005)1\u000f]1sW&\u0011QE\u0007\u0002\r'R\u0014X-Y7Xe&$XM\u001d\t\u0003O)j\u0011\u0001\u000b\u0006\u0003S!\tQ!\u001e;jYNL!a\u000b\u0015\u0003\u000f1{wmZ5oO\"AQ\u0006\u0001B\u0001B\u0003%a&\u0001\u0004tG\",W.\u0019\t\u0003_Ij\u0011\u0001\r\u0006\u0003c\u0005\nQ\u0001^=qKNL!a\r\u0019\u0003\u0015M#(/^2u)f\u0004X\r\u0003\u00056\u0001\t\u0005\t\u0015!\u00037\u0003)\u0001\u0018M]1nKR,'o\u001d\t\u0003oaj\u0011!H\u0005\u0003su\u0011\u0011\u0003R1uCN{WO]2f\u001fB$\u0018n\u001c8t\u0011\u0015Y\u0004\u0001\"\u0001=\u0003\u0019a\u0014N\\5u}Q\u0019Qh\u0010!\u0011\u0005y\u0002Q\"\u0001\u0002\t\u000b5R\u0004\u0019\u0001\u0018\t\u000bUR\u0004\u0019\u0001\u001c\t\u0011\t\u0003\u0001R1A\u0005\n\r\u000bq\u0002];cY&\u001c\b.\u0011;uK6\u0004Ho]\u000b\u0002\tB\u0011Q\tS\u0007\u0002\r*\tq)A\u0003tG\u0006d\u0017-\u0003\u0002J\r\n\u0019\u0011J\u001c;\t\u0011-\u0003\u0001\u0012!Q!\n\u0011\u000b\u0001\u0003];cY&\u001c\b.\u0011;uK6\u0004Ho\u001d\u0011\t\u00115\u0003\u0001R1A\u0005\n9\u000ba\u0002];cY&\u001c\bNQ1dW>4g-F\u0001P!\t)\u0005+\u0003\u0002R\r\n!Aj\u001c8h\u0011!\u0019\u0006\u0001#A!B\u0013y\u0015a\u00049vE2L7\u000f\u001b\"bG.|gM\u001a\u0011\t\u000f\r\u0002!\u0019!C\u0005+V\ta\u000b\u0005\u0002X16\t\u0011%\u0003\u0002ZC\ta1\u000b]1sWN+7o]5p]\"11\f\u0001Q\u0001\nY\u000baa\u001d9be.\u0004\u0003\"C/\u0001\u0001\u0004\u0005\r\u0011\"\u0003_\u0003\u0015!x\u000e]5d+\u0005y\u0006C\u00011d\u001d\t)\u0015-\u0003\u0002c\r\u00061\u0001K]3eK\u001aL!\u0001Z3\u0003\rM#(/\u001b8h\u0015\t\u0011g\tC\u0005h\u0001\u0001\u0007\t\u0019!C\u0005Q\u0006IAo\u001c9jG~#S-\u001d\u000b\u0003S2\u0004\"!\u00126\n\u0005-4%\u0001B+oSRDq!\u001c4\u0002\u0002\u0003\u0007q,A\u0002yIEBaa\u001c\u0001!B\u0013y\u0016A\u0002;pa&\u001c\u0007\u0005C\u0004r\u0001\u0001\u0007I\u0011B\"\u0002\u0007E|7\u000fC\u0004t\u0001\u0001\u0007I\u0011\u0002;\u0002\u000fE|7o\u0018\u0013fcR\u0011\u0011.\u001e\u0005\b[J\f\t\u00111\u0001E\u0011\u00199\b\u0001)Q\u0005\t\u0006!\u0011o\\:!\u0011\u0015I\b\u0001\"\u0003{\u0003)Ig.\u001b;jC2L'0\u001a\u000b\u0002S\")A\u0010\u0001C!{\u0006\u00192M]3bi\u0016<&/\u001b;fe\u001a\u000b7\r^8ssR\ta\u0010E\u0003��\u0003\u0003\t)!D\u0001\u001c\u0013\r\t\u0019a\u0007\u0002\u0012\t\u0006$\u0018m\u0016:ji\u0016\u0014h)Y2u_JL\bcA,\u0002\b%\u0019\u0011\u0011B\u0011\u0003\u0007I{w\u000fC\u0004\u0002\u000e\u0001!\t%a\u0004\u0002\r\r|W.\\5u)\u0015I\u0017\u0011CA\u000b\u0011\u001d\t\u0019\"a\u0003A\u0002=\u000bq!\u001a9pG\"LE\r\u0003\u0005\u0002\u0018\u0005-\u0001\u0019AA\r\u0003!iWm]:bO\u0016\u001c\b#B#\u0002\u001c\u0005}\u0011bAA\u000f\r\n)\u0011I\u001d:bsB\u0019q0!\t\n\u0007\u0005\r2DA\nXe&$XM]\"p[6LG/T3tg\u0006<W\rC\u0004\u0002\u000e\u0001!\t%a\n\u0015\u0007%\fI\u0003\u0003\u0005\u0002\u0018\u0005\u0015\u0002\u0019AA\r\u0011\u001d\ti\u0003\u0001C!\u0003_\tQ!\u00192peR$R![A\u0019\u0003gAq!a\u0005\u0002,\u0001\u0007q\n\u0003\u0005\u0002\u0018\u0005-\u0002\u0019AA\r\u0011\u001d\ti\u0003\u0001C!\u0003o!2![A\u001d\u0011!\t9\"!\u000eA\u0002\u0005e\u0001")
/* loaded from: input_file:org/apache/bahir/sql/streaming/mqtt/MQTTStreamWriter.class */
public class MQTTStreamWriter implements StreamWriter, Logging {
    private final StructType schema;
    private final DataSourceOptions parameters;
    private int publishAttempts;
    private long publishBackoff;
    private final SparkSession spark;
    private String topic;
    private int qos;
    private final Logger log;
    private volatile byte bitmap$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private int publishAttempts$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.publishAttempts = SparkEnv$.MODULE$.get().conf().getInt("spark.mqtt.client.publish.attempts", -1);
                this.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.publishAttempts;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private long publishBackoff$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.publishBackoff = SparkEnv$.MODULE$.get().conf().getTimeAsMs("spark.mqtt.client.publish.backoff", "5s");
                this.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.publishBackoff;
        }
    }

    @Override // org.apache.bahir.utils.Logging
    public final Logger log() {
        return this.log;
    }

    @Override // org.apache.bahir.utils.Logging
    public final void org$apache$bahir$utils$Logging$_setter_$log_$eq(Logger logger) {
        this.log = logger;
    }

    private int publishAttempts() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? publishAttempts$lzycompute() : this.publishAttempts;
    }

    private long publishBackoff() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? publishBackoff$lzycompute() : this.publishBackoff;
    }

    private SparkSession spark() {
        return this.spark;
    }

    private String topic() {
        return this.topic;
    }

    private void topic_$eq(String str) {
        this.topic = str;
    }

    private int qos() {
        return this.qos;
    }

    private void qos_$eq(int i) {
        this.qos = i;
    }

    private void initialize() {
        Tuple6<String, String, String, MqttClientPersistence, MqttConnectOptions, Object> parseConfigParams = MQTTUtils$.MODULE$.parseConfigParams(HashMap$.MODULE$.apply(Nil$.MODULE$).$plus$plus((GenTraversableOnce) JavaConverters$.MODULE$.mapAsScalaMapConverter(this.parameters.asMap()).asScala()));
        if (parseConfigParams == null) {
            throw new MatchError(parseConfigParams);
        }
        Tuple2 tuple2 = new Tuple2((String) parseConfigParams._3(), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(parseConfigParams._6())));
        String str = (String) tuple2._1();
        int _2$mcI$sp = tuple2._2$mcI$sp();
        topic_$eq(str);
        qos_$eq(_2$mcI$sp);
    }

    public DataWriterFactory<Row> createWriterFactory() {
        return PackedRowWriterFactory$.MODULE$;
    }

    public void commit(long j, WriterCommitMessage[] writerCommitMessageArr) {
        commit(writerCommitMessageArr);
    }

    public void commit(WriterCommitMessage[] writerCommitMessageArr) {
        spark().createDataFrame((List) JavaConverters$.MODULE$.seqAsJavaListConverter(Predef$.MODULE$.refArrayOps((Row[]) Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(writerCommitMessageArr).collect(new MQTTStreamWriter$$anonfun$1(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Row.class))))).flatten(new MQTTStreamWriter$$anonfun$2(this), ClassTag$.MODULE$.apply(Row.class))).toList()).asJava(), this.schema).foreachPartition(new MQTTStreamWriter$$anonfun$commit$1(this, (Seq) Seq$.MODULE$.apply(Nil$.MODULE$).$plus$plus((GenTraversableOnce) ((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(this.parameters.asMap()).asScala()).toSeq().filterNot(new MQTTStreamWriter$$anonfun$3(this)), Seq$.MODULE$.canBuildFrom()), topic(), qos(), publishAttempts(), publishBackoff()));
    }

    public void abort(long j, WriterCommitMessage[] writerCommitMessageArr) {
    }

    public void abort(WriterCommitMessage[] writerCommitMessageArr) {
    }

    public MQTTStreamWriter(StructType structType, DataSourceOptions dataSourceOptions) {
        this.schema = structType;
        this.parameters = dataSourceOptions;
        org$apache$bahir$utils$Logging$_setter_$log_$eq(LoggerFactory.getLogger(new StringOps(Predef$.MODULE$.augmentString(getClass().getName())).stripSuffix("$")));
        Predef$.MODULE$.assert(SparkSession$.MODULE$.getActiveSession().isDefined());
        this.spark = (SparkSession) SparkSession$.MODULE$.getActiveSession().get();
        this.qos = -1;
        initialize();
    }
}
