package org.apache.spark.streaming.mqtt;

import java.io.File;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.lang3.RandomUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.internal.Logging;
import org.apache.spark.util.Utils$;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.StringContext;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: MQTTTestUtils.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00055a!B\u0001\u0003\u0001\ta!!D'R)R#Vm\u001d;Vi&d7O\u0003\u0002\u0004\t\u0005!Q.\u001d;u\u0015\t)a!A\u0005tiJ,\u0017-\\5oO*\u0011q\u0001C\u0001\u0006gB\f'o\u001b\u0006\u0003\u0013)\ta!\u00199bG\",'\"A\u0006\u0002\u0007=\u0014xmE\u0002\u0001\u001bM\u0001\"AD\t\u000e\u0003=Q\u0011\u0001E\u0001\u0006g\u000e\fG.Y\u0005\u0003%=\u0011a!\u00118z%\u00164\u0007C\u0001\u000b\u0018\u001b\u0005)\"B\u0001\f\u0007\u0003!Ig\u000e^3s]\u0006d\u0017B\u0001\r\u0016\u0005\u001daunZ4j]\u001eDQA\u0007\u0001\u0005\u0002q\ta\u0001P5oSRt4\u0001\u0001\u000b\u0002;A\u0011a\u0004A\u0007\u0002\u0005!9\u0001\u0005\u0001b\u0001\n\u0013\t\u0013A\u00049feNL7\u000f^3oG\u0016$\u0015N]\u000b\u0002EA\u00111\u0005K\u0007\u0002I)\u0011QEJ\u0001\u0003S>T\u0011aJ\u0001\u0005U\u00064\u0018-\u0003\u0002*I\t!a)\u001b7f\u0011\u0019Y\u0003\u0001)A\u0005E\u0005y\u0001/\u001a:tSN$XM\\2f\t&\u0014\b\u0005C\u0004.\u0001\t\u0007I\u0011\u0002\u0018\u0002\u0015\t\u0014xn[3s\u0011>\u001cH/F\u00010!\t\u00014'D\u00012\u0015\t\u0011d%\u0001\u0003mC:<\u0017B\u0001\u001b2\u0005\u0019\u0019FO]5oO\"1a\u0007\u0001Q\u0001\n=\n1B\u0019:pW\u0016\u0014\bj\\:uA!9\u0001\b\u0001b\u0001\n\u0013I\u0014A\u00032s_.,'\u000fU8siV\t!\b\u0005\u0002\u000fw%\u0011Ah\u0004\u0002\u0004\u0013:$\bB\u0002 \u0001A\u0003%!(A\u0006ce>\\WM\u001d)peR\u0004\u0003\"\u0003!\u0001\u0001\u0004\u0005\r\u0011\"\u0003B\u0003\u0019\u0011'o\\6feV\t!\t\u0005\u0002D\u000f6\tAI\u0003\u0002A\u000b*\u0011a\tC\u0001\tC\u000e$\u0018N^3nc&\u0011\u0001\n\u0012\u0002\u000e\u0005J|7.\u001a:TKJ4\u0018nY3\t\u0013)\u0003\u0001\u0019!a\u0001\n\u0013Y\u0015A\u00032s_.,'o\u0018\u0013fcR\u0011Aj\u0014\t\u0003\u001d5K!AT\b\u0003\tUs\u0017\u000e\u001e\u0005\b!&\u000b\t\u00111\u0001C\u0003\rAH%\r\u0005\u0007%\u0002\u0001\u000b\u0015\u0002\"\u0002\u000f\t\u0014xn[3sA!IA\u000b\u0001a\u0001\u0002\u0004%I!V\u0001\fgf\u001cH/Z7Vg\u0006<W-F\u0001W!\t9&,D\u0001Y\u0015\tIV)A\u0003vg\u0006<W-\u0003\u0002\\1\nY1+_:uK6,6/Y4f\u0011%i\u0006\u00011AA\u0002\u0013%a,A\btsN$X-\\+tC\u001e,w\fJ3r)\tau\fC\u0004Q9\u0006\u0005\t\u0019\u0001,\t\r\u0005\u0004\u0001\u0015)\u0003W\u00031\u0019\u0018p\u001d;f[V\u001b\u0018mZ3!\u0011%\u0019\u0007\u00011AA\u0002\u0013%A-A\u0005d_:tWm\u0019;peV\tQ\r\u0005\u0002DM&\u0011q\r\u0012\u0002\u0013)J\fgn\u001d9peR\u001cuN\u001c8fGR|'\u000fC\u0005j\u0001\u0001\u0007\t\u0019!C\u0005U\u0006i1m\u001c8oK\u000e$xN]0%KF$\"\u0001T6\t\u000fAC\u0017\u0011!a\u0001K\"1Q\u000e\u0001Q!\n\u0015\f!bY8o]\u0016\u001cGo\u001c:!\u0011\u0015y\u0007\u0001\"\u0001q\u0003%\u0011'o\\6feV\u0013\u0018.F\u0001r!\t\u0011XO\u0004\u0002\u000fg&\u0011AoD\u0001\u0007!J,G-\u001a4\n\u0005Q2(B\u0001;\u0010\u0011\u0015A\b\u0001\"\u0001z\u0003\u0015\u0019X\r^;q)\u0005a\u0005\"B>\u0001\t\u0003I\u0018\u0001\u0003;fCJ$wn\u001e8\t\u000bu\u0004A\u0011\u0002@\u0002\u0019\u0019Lg\u000e\u001a$sK\u0016\u0004vN\u001d;\u0015\u0003iBq!!\u0001\u0001\t\u0003\t\u0019!A\u0006qk\nd\u0017n\u001d5ECR\fG#\u0002'\u0002\u0006\u0005%\u0001BBA\u0004\u007f\u0002\u0007\u0011/A\u0003u_BL7\r\u0003\u0004\u0002\f}\u0004\r!]\u0001\u0005I\u0006$\u0018\r")
/* loaded from: input_file:org/apache/spark/streaming/mqtt/MQTTTestUtils.class */
public class MQTTTestUtils implements Logging {
    private final File persistenceDir;
    private final String brokerHost;
    private final int brokerPort;
    private BrokerService broker;
    private SystemUsage systemUsage;
    private TransportConnector connector;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    private File persistenceDir() {
        return this.persistenceDir;
    }

    private String brokerHost() {
        return this.brokerHost;
    }

    private int brokerPort() {
        return this.brokerPort;
    }

    private BrokerService broker() {
        return this.broker;
    }

    private void broker_$eq(BrokerService brokerService) {
        this.broker = brokerService;
    }

    private SystemUsage systemUsage() {
        return this.systemUsage;
    }

    private void systemUsage_$eq(SystemUsage systemUsage) {
        this.systemUsage = systemUsage;
    }

    private TransportConnector connector() {
        return this.connector;
    }

    private void connector_$eq(TransportConnector transportConnector) {
        this.connector = transportConnector;
    }

    public String brokerUri() {
        return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ":", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{brokerHost(), BoxesRunTime.boxToInteger(brokerPort())}));
    }

    public void setup() {
        broker_$eq(new BrokerService());
        broker().setDataDirectoryFile(Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2()));
        broker().getSystemUsage().setSendFailIfNoSpace(false);
        systemUsage_$eq(broker().getSystemUsage());
        systemUsage().getStoreUsage().setLimit(268435456L);
        systemUsage().getTempUsage().setLimit(134217728L);
        connector_$eq(new TransportConnector());
        connector().setName("mqtt");
        connector().setUri(new URI(new StringBuilder().append("mqtt://").append(brokerUri()).toString()));
        broker().addConnector(connector());
        broker().start();
    }

    public void teardown() {
        if (broker() != null) {
            broker().stop();
            broker_$eq(null);
        }
        if (connector() != null) {
            connector().stop();
            connector_$eq(null);
        }
        Utils$.MODULE$.deleteRecursively(persistenceDir());
    }

    private int findFreePort() {
        return Utils$.MODULE$.startServiceOnPort(RandomUtils.nextInt(1024, 65536), new MQTTTestUtils$$anonfun$findFreePort$1(this), new SparkConf(), Utils$.MODULE$.startServiceOnPort$default$4())._2$mcI$sp();
    }

    public void publishData(String str, String str2) {
        MqttClient mqttClient = null;
        try {
            mqttClient = new MqttClient(new StringBuilder().append("tcp://").append(brokerUri()).toString(), MqttClient.generateClientId(), new MqttDefaultFilePersistence(persistenceDir().getAbsolutePath()));
            mqttClient.connect();
            if (mqttClient.isConnected()) {
                MqttTopic topic = mqttClient.getTopic(str);
                MqttMessage mqttMessage = new MqttMessage(str2.getBytes(StandardCharsets.UTF_8));
                mqttMessage.setQos(1);
                mqttMessage.setRetained(true);
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach(new MQTTTestUtils$$anonfun$publishData$1(this, topic, mqttMessage));
            }
            if (mqttClient != null) {
                mqttClient.disconnect();
                mqttClient.close();
            }
        } catch (Throwable th) {
            if (mqttClient != null) {
                mqttClient.disconnect();
                mqttClient.close();
            }
            throw th;
        }
    }

    public MQTTTestUtils() {
        Logging.class.$init$(this);
        this.persistenceDir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), Utils$.MODULE$.createTempDir$default$2());
        this.brokerHost = "localhost";
        this.brokerPort = findFreePort();
    }
}
