/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bahir.sql.streaming.mqtt;

import java.io.File;
import java.net.ServerSocket;
import java.net.URI;
import java.nio.charset.Charset;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.bahir.sql.streaming.mqtt.MQTTTestUtils$;
import org.apache.bahir.utils.Logging;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005ed\u0001B\u0001\u0003\u0001=\u0011Q\"T)U)R+7\u000f^+uS2\u001c(BA\u0002\u0005\u0003\u0011i\u0017\u000f\u001e;\u000b\u0005\u00151\u0011!C:ue\u0016\fW.\u001b8h\u0015\t9\u0001\"A\u0002tc2T!!\u0003\u0006\u0002\u000b\t\f\u0007.\u001b:\u000b\u0005-a\u0011AB1qC\u000eDWMC\u0001\u000e\u0003\ry'oZ\u0002\u0001'\r\u0001\u0001C\u0006\t\u0003#Qi\u0011A\u0005\u0006\u0002'\u0005)1oY1mC&\u0011QC\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\u0005]QR\"\u0001\r\u000b\u0005eA\u0011!B;uS2\u001c\u0018BA\u000e\u0019\u0005\u001daunZ4j]\u001eD\u0001\"\b\u0001\u0003\u0002\u0003\u0006IAH\u0001\bi\u0016l\u0007\u000fR5s!\tyB%D\u0001!\u0015\t\t#%\u0001\u0002j_*\t1%\u0001\u0003kCZ\f\u0017BA\u0013!\u0005\u00111\u0015\u000e\\3\t\u0011\u001d\u0002!\u0011!Q\u0001\n!\nA\u0001]8siB\u0011\u0011#K\u0005\u0003UI\u00111!\u00138u\u0011\u0015a\u0003\u0001\"\u0001.\u0003\u0019a\u0014N\\5u}Q\u0019a\u0006M\u0019\u0011\u0005=\u0002Q\"\u0001\u0002\t\u000buY\u0003\u0019\u0001\u0010\t\u000f\u001dZ\u0003\u0013!a\u0001Q!91\u0007\u0001b\u0001\n\u0013!\u0014A\u00032s_.,'\u000fS8tiV\tQ\u0007\u0005\u00027s5\tqG\u0003\u00029E\u0005!A.\u00198h\u0013\tQtG\u0001\u0004TiJLgn\u001a\u0005\u0007y\u0001\u0001\u000b\u0011B\u001b\u0002\u0017\t\u0014xn[3s\u0011>\u001cH\u000f\t\u0005\b}\u0001\u0011\r\u0011\"\u0003@\u0003)\u0011'o\\6feB{'\u000f^\u000b\u0002Q!1\u0011\t\u0001Q\u0001\n!\n1B\u0019:pW\u0016\u0014\bk\u001c:uA!I1\t\u0001a\u0001\u0002\u0004%I\u0001R\u0001\u0007EJ|7.\u001a:\u0016\u0003\u0015\u0003\"A\u0012&\u000e\u0003\u001dS!a\u0011%\u000b\u0005%S\u0011\u0001C1di&4X-\\9\n\u0005-;%!\u0004\"s_.,'oU3sm&\u001cW\rC\u0005N\u0001\u0001\u0007\t\u0019!C\u0005\u001d\u0006Q!M]8lKJ|F%Z9\u0015\u0005=\u0013\u0006CA\tQ\u0013\t\t&C\u0001\u0003V]&$\bbB*M\u0003\u0003\u0005\r!R\u0001\u0004q\u0012\n\u0004BB+\u0001A\u0003&Q)A\u0004ce>\\WM\u001d\u0011\t\u0013]\u0003\u0001\u0019!a\u0001\n\u0013A\u0016!C2p]:,7\r^8s+\u0005I\u0006C\u0001$[\u0013\tYvI\u0001\nUe\u0006t7\u000f]8si\u000e{gN\\3di>\u0014\b\"C/\u0001\u0001\u0004\u0005\r\u0011\"\u0003_\u00035\u0019wN\u001c8fGR|'o\u0018\u0013fcR\u0011qj\u0018\u0005\b'r\u000b\t\u00111\u0001Z\u0011\u0019\t\u0007\u0001)Q\u00053\u0006Q1m\u001c8oK\u000e$xN\u001d\u0011\t\u000b\r\u0004A\u0011\u00013\u0002\u0013\t\u0014xn[3s+JLW#A3\u0011\u0005\u0019LgBA\th\u0013\tA'#\u0001\u0004Qe\u0016$WMZ\u0005\u0003u)T!\u0001\u001b\n\t\u000b1\u0004A\u0011B7\u0002\u0019\u0019Lg\u000e\u001a$sK\u0016\u0004vN\u001d;\u0015\u0003!BQa\u001c\u0001\u0005\u0002A\fQa]3ukB$\u0012a\u0014\u0005\u0006e\u0002!\t\u0001]\u0001\ti\u0016\f'\u000fZ8x]\")A\u000f\u0001C\u0001k\u0006Y\u0001/\u001e2mSNDG)\u0019;b)\u0011ye\u000f\u001f>\t\u000b]\u001c\b\u0019A3\u0002\u000bQ|\u0007/[2\t\u000be\u001c\b\u0019A3\u0002\t\u0011\fG/\u0019\u0005\bwN\u0004\n\u00111\u0001)\u0003\u0005q\u0005\"B?\u0001\t\u0003q\u0018!D:vEN\u001c'/\u001b2f\t\u0006$\u0018\rF\u0003\u0000\u0003/\tI\u0002\u0005\u0003\u0002\u0002\u0005MQBAA\u0002\u0015\u0011\t)!a\u0002\u0002\r5\fH\u000f\u001e<4\u0015\u0011\tI!a\u0003\u0002\r\rd\u0017.\u001a8u\u0015\u0011\ti!a\u0004\u0002\tA\f\u0007n\u001c\u0006\u0004\u0003#a\u0011aB3dY&\u00048/Z\u0005\u0005\u0003+\t\u0019A\u0001\u0006NcR$8\t\\5f]RDQa\u001e?A\u0002\u0015Dq!a\u0007}\u0001\u0004\ti\"\u0001\u0005nKN\u001c\u0018mZ3t!\u0019\ty\"!\u000b)K6\u0011\u0011\u0011\u0005\u0006\u0005\u0003G\t)#A\u0004nkR\f'\r\\3\u000b\u0007\u0005\u001d\"#\u0001\u0006d_2dWm\u0019;j_:LA!a\u000b\u0002\"\t\u0019Q*\u00199\t\u000f\u0005=\u0002\u0001\"\u0001\u00022\u0005Q1\u000f\\3faVsG/\u001b7\u0015\u000b=\u000b\u0019$a\u0011\t\u0013\u0005U\u0012Q\u0006CA\u0002\u0005]\u0012!\u00039sK\u0012L7-\u0019;f!\u0015\t\u0012\u0011HA\u001f\u0013\r\tYD\u0005\u0002\ty\tLh.Y7f}A\u0019\u0011#a\u0010\n\u0007\u0005\u0005#CA\u0004C_>dW-\u00198\t\u0011\u0005\u0015\u0013Q\u0006a\u0001\u0003\u000f\nq\u0001^5nK>,H\u000fE\u0002\u0012\u0003\u0013J1!a\u0013\u0013\u0005\u0011auN\\4\t\u0013\u0005=\u0003!%A\u0005\u0002\u0005E\u0013!\u00069vE2L7\u000f\u001b#bi\u0006$C-\u001a4bk2$HeM\u000b\u0003\u0003'R3\u0001KA+W\t\t9\u0006\u0005\u0003\u0002Z\u0005\rTBAA.\u0015\u0011\ti&a\u0018\u0002\u0013Ut7\r[3dW\u0016$'bAA1%\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005\u0015\u00141\f\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,w!CA5\u0005\u0005\u0005\t\u0012AA6\u00035i\u0015\u000b\u0016+UKN$X\u000b^5mgB\u0019q&!\u001c\u0007\u0011\u0005\u0011\u0011\u0011!E\u0001\u0003_\u001a2!!\u001c\u0011\u0011\u001da\u0013Q\u000eC\u0001\u0003g\"\"!a\u001b\t\u0015\u0005]\u0014QNI\u0001\n\u0003\t\t&A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$HE\r")
public class MQTTTestUtils
implements Logging {
    private final File tempDir;
    private final String brokerHost;
    private final int brokerPort;
    private BrokerService broker;
    private TransportConnector connector;
    private final Logger log;

    public static int $lessinit$greater$default$2() {
        return MQTTTestUtils$.MODULE$.$lessinit$greater$default$2();
    }

    public final Logger log() {
        return this.log;
    }

    public final void org$apache$bahir$utils$Logging$_setter_$log_$eq(Logger x$1) {
        this.log = x$1;
    }

    private String brokerHost() {
        return this.brokerHost;
    }

    private int brokerPort() {
        return this.brokerPort;
    }

    private BrokerService broker() {
        return this.broker;
    }

    private void broker_$eq(BrokerService x$1) {
        this.broker = x$1;
    }

    private TransportConnector connector() {
        return this.connector;
    }

    private void connector_$eq(TransportConnector x$1) {
        this.connector = x$1;
    }

    public String brokerUri() {
        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ":", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.brokerHost(), BoxesRunTime.boxToInteger((int)this.brokerPort())}));
    }

    /*
     * WARNING - void declaration
     */
    private int findFreePort() {
        void var2_2;
        ServerSocket s = new ServerSocket(0);
        int port = s.getLocalPort();
        s.close();
        return (int)var2_2;
    }

    public void setup() {
        this.broker_$eq(new BrokerService());
        this.broker().setDataDirectoryFile(this.tempDir);
        this.connector_$eq(new TransportConnector());
        this.connector().setName("mqtt");
        this.connector().setUri(new URI(new StringBuilder().append((Object)"mqtt://").append((Object)this.brokerUri()).toString()));
        this.broker().addConnector(this.connector());
        this.broker().start();
    }

    public void teardown() {
        if (this.broker() != null) {
            this.broker().stop();
        }
        if (this.connector() != null) {
            this.connector().stop();
            this.connector_$eq(null);
        }
        while (true) {
            if (this.broker().isStopped()) {
                this.broker_$eq(null);
                return;
            }
            Thread.sleep(50L);
        }
    }

    public void publishData(String topic, String data, int N) {
        MqttClient client = null;
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            client = new MqttClient(new StringBuilder().append((Object)"tcp://").append((Object)this.brokerUri()).toString(), MqttClient.generateClientId(), (MqttClientPersistence)persistence);
            client.connect();
            if (client.isConnected()) {
                MqttTopic msgTopic = client.getTopic(topic);
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), N).foreach((Function1)new Serializable(this, data, msgTopic){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ MQTTTestUtils $outer;
                    private final String data$1;
                    private final MqttTopic msgTopic$1;

                    public final Object apply(int i) {
                        MqttDeliveryToken mqttDeliveryToken;
                        try {
                            Thread.sleep(20L);
                            MqttMessage message = new MqttMessage(this.data$1.getBytes());
                            message.setQos(2);
                            mqttDeliveryToken = this.msgTopic$1.publish(message);
                        }
                        catch (Throwable throwable) {
                            this.$outer.log().warn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"publish failed ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{throwable})));
                            mqttDeliveryToken = BoxedUnit.UNIT;
                        }
                        catch (MqttException mqttException) {
                            Thread.sleep(50L);
                            this.$outer.log().warn(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"publish failed"})).s((Seq)Nil$.MODULE$), (Throwable)mqttException);
                            mqttDeliveryToken = BoxedUnit.UNIT;
                        }
                        return mqttDeliveryToken;
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.data$1 = data$1;
                        this.msgTopic$1 = msgTopic$1;
                    }
                });
            }
            return;
        }
        finally {
            if (client != null) {
                client.disconnect();
                client.close();
                client = null;
            }
        }
    }

    public int publishData$default$3() {
        return 1;
    }

    /*
     * WARNING - void declaration
     */
    public MqttClient subscribeData(String topic, Map<Object, String> messages) {
        void var3_3;
        MqttClient client = new MqttClient(new StringBuilder().append((Object)"tcp://").append((Object)this.brokerUri()).toString(), MqttClient.generateClientId(), null);
        MqttCallbackExtended callback = new MqttCallbackExtended(this, messages){
            private final Map messages$1;

            public void messageArrived(String topic_, MqttMessage message) {
                $anon$1 var3_3 = this;
                synchronized (var3_3) {
                    this.messages$1.put((Object)BoxesRunTime.boxToInteger((int)message.getId()), (Object)new String(message.getPayload(), Charset.defaultCharset()));
                    return;
                }
            }

            public void deliveryComplete(IMqttDeliveryToken token) {
            }

            public void connectionLost(Throwable cause) {
            }

            public void connectComplete(boolean reconnect, String serverURI) {
            }
            {
                this.messages$1 = messages$1;
            }
        };
        client.setCallback((MqttCallback)callback);
        client.connect();
        client.subscribe(topic);
        return var3_3;
    }

    public void sleepUntil(Function0<Object> predicate, long timeout) {
        long deadline = System.currentTimeMillis() + timeout;
        while (System.currentTimeMillis() < deadline) {
            Thread.sleep(1000L);
            if (!predicate.apply$mcZ$sp()) continue;
            return;
        }
    }

    public MQTTTestUtils(File tempDir, int port) {
        this.tempDir = tempDir;
        Logging.class.$init$((Logging)this);
        this.brokerHost = "127.0.0.1";
        this.brokerPort = port == 0 ? this.findFreePort() : port;
    }
}

