/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bahir.sql.streaming.mqtt;

import java.io.File;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Properties;
import javax.net.ssl.KeyManagerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslBrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.bahir.sql.streaming.mqtt.MQTTTestUtils$;
import org.apache.bahir.utils.Logging;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.mutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005-g\u0001B\u0001\u0003\u0001=\u0011Q\"T)U)R+7\u000f^+uS2\u001c(BA\u0002\u0005\u0003\u0011i\u0017\u000f\u001e;\u000b\u0005\u00151\u0011!C:ue\u0016\fW.\u001b8h\u0015\t9\u0001\"A\u0002tc2T!!\u0003\u0006\u0002\u000b\t\f\u0007.\u001b:\u000b\u0005-a\u0011AB1qC\u000eDWMC\u0001\u000e\u0003\ry'oZ\u0002\u0001'\r\u0001\u0001C\u0006\t\u0003#Qi\u0011A\u0005\u0006\u0002'\u0005)1oY1mC&\u0011QC\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\u0005]QR\"\u0001\r\u000b\u0005eA\u0011!B;uS2\u001c\u0018BA\u000e\u0019\u0005\u001daunZ4j]\u001eD\u0001\"\b\u0001\u0003\u0002\u0003\u0006IAH\u0001\bi\u0016l\u0007\u000fR5s!\tyB%D\u0001!\u0015\t\t#%\u0001\u0002j_*\t1%\u0001\u0003kCZ\f\u0017BA\u0013!\u0005\u00111\u0015\u000e\\3\t\u0011\u001d\u0002!\u0011!Q\u0001\n!\nA\u0001]8siB\u0011\u0011#K\u0005\u0003UI\u00111!\u00138u\u0011!a\u0003A!A!\u0002\u0013i\u0013aA:tYB\u0011\u0011CL\u0005\u0003_I\u0011qAQ8pY\u0016\fg\u000eC\u00032\u0001\u0011\u0005!'\u0001\u0004=S:LGO\u0010\u000b\u0005gU2t\u0007\u0005\u00025\u00015\t!\u0001C\u0003\u001ea\u0001\u0007a\u0004C\u0004(aA\u0005\t\u0019\u0001\u0015\t\u000f1\u0002\u0004\u0013!a\u0001[!9\u0011\b\u0001b\u0001\n\u0013Q\u0014A\u00032s_.,'\u000fS8tiV\t1\b\u0005\u0002=\u007f5\tQH\u0003\u0002?E\u0005!A.\u00198h\u0013\t\u0001UH\u0001\u0004TiJLgn\u001a\u0005\u0007\u0005\u0002\u0001\u000b\u0011B\u001e\u0002\u0017\t\u0014xn[3s\u0011>\u001cH\u000f\t\u0005\b\t\u0002\u0011\r\u0011\"\u0003F\u0003)\u0011'o\\6feB{'\u000f^\u000b\u0002Q!1q\t\u0001Q\u0001\n!\n1B\u0019:pW\u0016\u0014\bk\u001c:uA!9\u0011\n\u0001b\u0001\n\u0003Q\u0015AD:feZ,'oS3z'R|'/Z\u000b\u0002=!1A\n\u0001Q\u0001\ny\tqb]3sm\u0016\u00148*Z=Ti>\u0014X\r\t\u0005\b\u001d\u0002\u0011\r\u0011\"\u0001;\u0003Y\u0019XM\u001d<fe.+\u0017p\u0015;pe\u0016\u0004\u0016m]:x_J$\u0007B\u0002)\u0001A\u0003%1(A\ftKJ4XM]&fsN#xN]3QCN\u001cxo\u001c:eA!9!\u000b\u0001b\u0001\n\u0003Q\u0015\u0001E2mS\u0016tG\u000f\u0016:vgR\u001cFo\u001c:f\u0011\u0019!\u0006\u0001)A\u0005=\u0005\t2\r\\5f]R$&/^:u'R|'/\u001a\u0011\t\u000fY\u0003!\u0019!C\u0001u\u0005A2\r\\5f]R$&/^:u'R|'/\u001a)bgN<xN\u001d3\t\ra\u0003\u0001\u0015!\u0003<\u0003e\u0019G.[3oiR\u0013Xo\u001d;Ti>\u0014X\rU1tg^|'\u000f\u001a\u0011\t\u0013i\u0003\u0001\u0019!a\u0001\n\u0013Y\u0016A\u00022s_.,'/F\u0001]!\ti\u0016-D\u0001_\u0015\tQvL\u0003\u0002a\u0015\u0005A\u0011m\u0019;jm\u0016l\u0017/\u0003\u0002c=\ni!I]8lKJ\u001cVM\u001d<jG\u0016D\u0011\u0002\u001a\u0001A\u0002\u0003\u0007I\u0011B3\u0002\u0015\t\u0014xn[3s?\u0012*\u0017\u000f\u0006\u0002gSB\u0011\u0011cZ\u0005\u0003QJ\u0011A!\u00168ji\"9!nYA\u0001\u0002\u0004a\u0016a\u0001=%c!IA\u000e\u0001a\u0001\u0002\u0003\u0006K\u0001X\u0001\bEJ|7.\u001a:!\u0011%q\u0007\u00011AA\u0002\u0013%q.A\u0005d_:tWm\u0019;peV\t\u0001\u000f\u0005\u0002^c&\u0011!O\u0018\u0002\u0013)J\fgn\u001d9peR\u001cuN\u001c8fGR|'\u000fC\u0005u\u0001\u0001\u0007\t\u0019!C\u0005k\u0006i1m\u001c8oK\u000e$xN]0%KF$\"A\u001a<\t\u000f)\u001c\u0018\u0011!a\u0001a\"I\u0001\u0010\u0001a\u0001\u0002\u0003\u0006K\u0001]\u0001\u000bG>tg.Z2u_J\u0004\u0003\"\u0002>\u0001\t\u0003Y\u0018!\u00032s_.,'/\u0016:j+\u0005a\bcA?\u0002\n9\u0019a0!\u0002\u0011\u0005}\u0014RBAA\u0001\u0015\r\t\u0019AD\u0001\u0007yI|w\u000e\u001e \n\u0007\u0005\u001d!#\u0001\u0004Qe\u0016$WMZ\u0005\u0004\u0001\u0006-!bAA\u0004%!9\u0011q\u0002\u0001\u0005\n\u0005E\u0011\u0001\u00044j]\u00124%/Z3Q_J$H#\u0001\u0015\t\u000f\u0005U\u0001\u0001\"\u0001\u0002\u0018\u0005)1/\u001a;vaR\ta\rC\u0004\u0002\u001c\u0001!\t!a\u0006\u0002\u0011Q,\u0017M\u001d3po:Dq!a\b\u0001\t\u0003\t\t#A\u0006qk\nd\u0017n\u001d5ECR\fGc\u00024\u0002$\u0005\u001d\u00121\u0006\u0005\b\u0003K\ti\u00021\u0001}\u0003\u0015!x\u000e]5d\u0011\u001d\tI#!\bA\u0002q\fA\u0001Z1uC\"I\u0011QFA\u000f!\u0003\u0005\r\u0001K\u0001\u0002\u001d\"9\u0011\u0011\u0007\u0001\u0005\u0002\u0005M\u0012!D:vEN\u001c'/\u001b2f\t\u0006$\u0018\r\u0006\u0004\u00026\u00055\u0013q\n\t\u0005\u0003o\tI%\u0004\u0002\u0002:)!\u00111HA\u001f\u0003\u0019i\u0017\u000f\u001e;wg)!\u0011qHA!\u0003\u0019\u0019G.[3oi*!\u00111IA#\u0003\u0011\u0001\u0018\r[8\u000b\u0007\u0005\u001dC\"A\u0004fG2L\u0007o]3\n\t\u0005-\u0013\u0011\b\u0002\u000b\u001bF$Ho\u00117jK:$\bbBA\u0013\u0003_\u0001\r\u0001 \u0005\t\u0003#\ny\u00031\u0001\u0002T\u0005AQ.Z:tC\u001e,7\u000f\u0005\u0004\u0002V\u0005}\u0003\u0006`\u0007\u0003\u0003/RA!!\u0017\u0002\\\u00059Q.\u001e;bE2,'bAA/%\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\t\u0005\u0005\u0014q\u000b\u0002\u0004\u001b\u0006\u0004\bbBA3\u0001\u0011\u0005\u0011qM\u0001\u0010G>tg.Z2u)>\u001cVM\u001d<feR1\u0011QGA5\u0003gB\u0001\"a\u001b\u0002d\u0001\u0007\u0011QN\u0001\fa\u0016\u00148/[:uK:\u001cW\r\u0005\u0003\u00028\u0005=\u0014\u0002BA9\u0003s\u0011Q#T9ui\u000ec\u0017.\u001a8u!\u0016\u00148/[:uK:\u001cW\r\u0003\u0005\u0002v\u0005\r\u0004\u0019AA<\u0003!\u0019\u0017\r\u001c7cC\u000e\\\u0007\u0003BA\u001c\u0003sJA!a\u001f\u0002:\t!R*\u001d;u\u0007\u0006dGNY1dW\u0016CH/\u001a8eK\u0012Dq!a \u0001\t\u0003\t\t)\u0001\u0006tY\u0016,\u0007/\u00168uS2$RAZAB\u0003\u001bC\u0011\"!\"\u0002~\u0011\u0005\r!a\"\u0002\u0013A\u0014X\rZ5dCR,\u0007\u0003B\t\u0002\n6J1!a#\u0013\u0005!a$-\u001f8b[\u0016t\u0004\u0002CAH\u0003{\u0002\r!!%\u0002\u000fQLW.Z8viB\u0019\u0011#a%\n\u0007\u0005U%C\u0001\u0003M_:<\u0007\"CAM\u0001E\u0005I\u0011AAN\u0003U\u0001XO\u00197jg\"$\u0015\r^1%I\u00164\u0017-\u001e7uIM*\"!!(+\u0007!\nyj\u000b\u0002\u0002\"B!\u00111UAW\u001b\t\t)K\u0003\u0003\u0002(\u0006%\u0016!C;oG\",7m[3e\u0015\r\tYKE\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BAX\u0003K\u0013\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u000f%\t\u0019LAA\u0001\u0012\u0003\t),A\u0007N#R#F+Z:u+RLGn\u001d\t\u0004i\u0005]f\u0001C\u0001\u0003\u0003\u0003E\t!!/\u0014\u0007\u0005]\u0006\u0003C\u00042\u0003o#\t!!0\u0015\u0005\u0005U\u0006BCAa\u0003o\u000b\n\u0011\"\u0001\u0002\u001c\u0006YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIIB!\"!2\u00028F\u0005I\u0011AAd\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011\u0011\u001a\u0016\u0004[\u0005}\u0005")
public class MQTTTestUtils
implements Logging {
    private final File tempDir;
    private final boolean ssl;
    private final String brokerHost;
    private final int brokerPort;
    private final File serverKeyStore;
    private final String serverKeyStorePassword;
    private final File clientTrustStore;
    private final String clientTrustStorePassword;
    private BrokerService broker;
    private TransportConnector connector;
    private final Logger log;

    public static boolean $lessinit$greater$default$3() {
        return MQTTTestUtils$.MODULE$.$lessinit$greater$default$3();
    }

    public static int $lessinit$greater$default$2() {
        return MQTTTestUtils$.MODULE$.$lessinit$greater$default$2();
    }

    public final Logger log() {
        return this.log;
    }

    public final void org$apache$bahir$utils$Logging$_setter_$log_$eq(Logger x$1) {
        this.log = x$1;
    }

    private String brokerHost() {
        return this.brokerHost;
    }

    private int brokerPort() {
        return this.brokerPort;
    }

    public File serverKeyStore() {
        return this.serverKeyStore;
    }

    public String serverKeyStorePassword() {
        return this.serverKeyStorePassword;
    }

    public File clientTrustStore() {
        return this.clientTrustStore;
    }

    public String clientTrustStorePassword() {
        return this.clientTrustStorePassword;
    }

    private BrokerService broker() {
        return this.broker;
    }

    private void broker_$eq(BrokerService x$1) {
        this.broker = x$1;
    }

    private TransportConnector connector() {
        return this.connector;
    }

    private void connector_$eq(TransportConnector x$1) {
        this.connector = x$1;
    }

    public String brokerUri() {
        return new StringBuilder(1).append(this.brokerHost()).append(":").append(this.brokerPort()).toString();
    }

    /*
     * WARNING - void declaration
     */
    private int findFreePort() {
        void var2_2;
        ServerSocket s = new ServerSocket(0);
        int port = s.getLocalPort();
        s.close();
        return (int)var2_2;
    }

    public void setup() {
        String protocol;
        this.broker_$eq((BrokerService)(this.ssl ? new SslBrokerService() : new BrokerService()));
        this.broker().setDataDirectoryFile(this.tempDir);
        String string = protocol = this.ssl ? "mqtt+ssl" : "mqtt";
        if (this.ssl) {
            KeyStore keyStore = KeyStore.getInstance("JKS");
            keyStore.load(Files.newInputStream(this.serverKeyStore().toPath(), new OpenOption[0]), this.serverKeyStorePassword().toCharArray());
            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            keyManagerFactory.init(keyStore, this.serverKeyStorePassword().toCharArray());
            this.broker().setSslContext(new SslContext(keyManagerFactory.getKeyManagers(), null, new SecureRandom()));
        }
        this.connector_$eq(new TransportConnector());
        this.connector().setName("mqtt");
        this.connector().setUri(new URI(new StringBuilder(3).append(protocol).append("://").append(this.brokerUri()).toString()));
        this.broker().addConnector(this.connector());
        this.broker().start();
    }

    public void teardown() {
        if (this.broker() != null) {
            this.broker().stop();
        }
        if (this.connector() != null) {
            this.connector().stop();
            this.connector_$eq(null);
        }
        while (!this.broker().isStopped()) {
            Thread.sleep(50L);
        }
        this.broker_$eq(null);
    }

    public void publishData(String topic, String data, int N) {
        MqttClient client = null;
        try {
            client = this.connectToServer((MqttClientPersistence)new MemoryPersistence(), null);
            if (client.isConnected()) {
                MqttTopic msgTopic = client.getTopic(topic);
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), N).foreach((Function1 & Serializable & scala.Serializable)_ -> MQTTTestUtils.$anonfun$publishData$1(this, data, msgTopic, BoxesRunTime.unboxToInt((Object)_)));
            }
        }
        finally {
            if (client != null) {
                client.disconnect();
                client.close();
                client = null;
            }
        }
    }

    public int publishData$default$3() {
        return 1;
    }

    public MqttClient subscribeData(String topic, Map<Object, String> messages) {
        MqttCallbackExtended callback = new MqttCallbackExtended(null, messages){
            private final Map messages$1;

            public void messageArrived(String topic_, MqttMessage message) {
                $anon$1 var3_3 = this;
                synchronized (var3_3) {
                    this.messages$1.put((Object)BoxesRunTime.boxToInteger((int)message.getId()), (Object)new String(message.getPayload(), Charset.defaultCharset()));
                }
            }

            public void deliveryComplete(IMqttDeliveryToken token) {
            }

            public void connectionLost(Throwable cause) {
            }

            public void connectComplete(boolean reconnect, String serverURI) {
            }
            {
                this.messages$1 = messages$1;
            }
        };
        MqttClient client = this.connectToServer(null, callback);
        client.subscribe(topic);
        return client;
    }

    public MqttClient connectToServer(MqttClientPersistence persistence, MqttCallbackExtended callback) {
        String protocol = this.ssl ? "ssl" : "tcp";
        MqttClient client = new MqttClient(new StringBuilder(3).append(protocol).append("://").append(this.brokerUri()).toString(), MqttClient.generateClientId(), persistence);
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        if (this.ssl) {
            Properties sslProperties = new Properties();
            sslProperties.setProperty("com.ibm.ssl.trustStore", this.clientTrustStore().getAbsolutePath());
            sslProperties.setProperty("com.ibm.ssl.trustStoreType", "JKS");
            sslProperties.setProperty("com.ibm.ssl.trustStorePassword", this.clientTrustStorePassword());
            connectOptions.setSSLProperties(sslProperties);
        }
        if (callback != null) {
            client.setCallback((MqttCallback)callback);
        }
        client.connect(connectOptions);
        return client;
    }

    public void sleepUntil(Function0<Object> predicate, long timeout) {
        long deadline = System.currentTimeMillis() + timeout;
        while (System.currentTimeMillis() < deadline) {
            Thread.sleep(1000L);
            if (!predicate.apply$mcZ$sp()) continue;
            return;
        }
    }

    public static final /* synthetic */ Object $anonfun$publishData$1(MQTTTestUtils $this, String data$1, MqttTopic msgTopic$1, int _) {
        BoxedUnit boxedUnit;
        try {
            Thread.sleep(20L);
            MqttMessage message = new MqttMessage(data$1.getBytes());
            message.setQos(2);
            boxedUnit = msgTopic$1.publish(message);
        }
        catch (MqttException e) {
            Thread.sleep(50L);
            $this.log().warn("publish failed", (Throwable)e);
            boxedUnit = BoxedUnit.UNIT;
        }
        catch (Throwable x) {
            $this.log().warn(new StringBuilder(15).append("publish failed ").append(x).toString());
            boxedUnit = BoxedUnit.UNIT;
        }
        return boxedUnit;
    }

    public MQTTTestUtils(File tempDir, int port, boolean ssl) {
        this.tempDir = tempDir;
        this.ssl = ssl;
        Logging.$init$((Logging)this);
        this.brokerHost = "127.0.0.1";
        this.brokerPort = port == 0 ? this.findFreePort() : port;
        this.serverKeyStore = new File("src/test/resources/keystore.jks");
        this.serverKeyStorePassword = "changeit";
        this.clientTrustStore = new File("src/test/resources/truststore.jks");
        this.clientTrustStorePassword = "changeit";
    }
}

