package org.apache.bahir.examples.sql.streaming.mqtt;

import java.util.Arrays;
import java.util.Iterator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;

/* loaded from: input_file:org/apache/bahir/examples/sql/streaming/mqtt/JavaMQTTStreamWordCount.class */
public final class JavaMQTTStreamWordCount {
    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 2) {
            System.err.println("Usage: JavaMQTTStreamWordCount <brokerUrl> <topic>");
            System.exit(1);
        }
        if (!Logger.getRootLogger().getAllAppenders().hasMoreElements()) {
            Logger.getRootLogger().setLevel(Level.WARN);
        }
        String str = strArr[0];
        String str2 = strArr[1];
        SparkConf appName = new SparkConf().setAppName("JavaMQTTStreamWordCount");
        if (!appName.contains("spark.master")) {
            appName.setMaster("local[4]");
        }
        SparkSession.builder().config(appName).getOrCreate().readStream().format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider").option("topic", str2).load(str).selectExpr(new String[]{"CAST(payload AS STRING)"}).as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() { // from class: org.apache.bahir.examples.sql.streaming.mqtt.JavaMQTTStreamWordCount.1
            public Iterator<String> call(String str3) {
                return Arrays.asList(str3.split(" ")).iterator();
            }
        }, Encoders.STRING()).groupBy("value", new String[0]).count().writeStream().outputMode("complete").format("console").start().awaitTermination();
    }
}
