/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mqtt;

import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);
    private BrokerService brokerService;
    private int port;
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Before
    public void startBroker() throws Exception {
        LOG.info("Finding free network port");
        try (ServerSocket socket = new ServerSocket(0);){
            this.port = socket.getLocalPort();
        }
        LOG.info("Starting ActiveMQ brokerService on {}", (Object)this.port);
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setPersistent(false);
        this.brokerService.addConnector("mqtt://localhost:" + this.port);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @Test(timeout=60000L)
    public void testReadNoClientId() throws Exception {
        String topicName = "READ_TOPIC_NO_CLIENT_ID";
        MqttIO.Read mqttReader = MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + this.port), (String)"READ_TOPIC_NO_CLIENT_ID")).withMaxNumRecords(10L);
        PCollection output = (PCollection)this.pipeline.apply((PTransform)mqttReader);
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new byte[][]{"This is test 0".getBytes(), "This is test 1".getBytes(), "This is test 2".getBytes(), "This is test 3".getBytes(), "This is test 4".getBytes(), "This is test 5".getBytes(), "This is test 6".getBytes(), "This is test 7".getBytes(), "This is test 8".getBytes(), "This is test 9".getBytes()});
        MQTT client = new MQTT();
        client.setHost("tcp://localhost:" + this.port);
        BlockingConnection publishConnection = client.blockingConnection();
        publishConnection.connect();
        Thread publisherThread = new Thread(() -> {
            try {
                LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
                boolean pipelineConnected = false;
                while (!pipelineConnected) {
                    Thread.sleep(1000L);
                    for (Connection connection : this.brokerService.getBroker().getClients()) {
                        if (connection.getConnectionId().isEmpty()) continue;
                        pipelineConnected = true;
                    }
                }
                for (int i = 0; i < 10; ++i) {
                    publishConnection.publish("READ_TOPIC_NO_CLIENT_ID", ("This is test " + i).getBytes(), QoS.EXACTLY_ONCE, false);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        publisherThread.start();
        this.pipeline.run();
        publishConnection.disconnect();
        publisherThread.join();
    }

    @Test(timeout=30000L)
    public void testRead() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + this.port), (String)"READ_TOPIC", (String)"READ_PIPELINE")).withMaxReadTime(Duration.standardSeconds((long)3L)));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new byte[][]{"This is test 0".getBytes(), "This is test 1".getBytes(), "This is test 2".getBytes(), "This is test 3".getBytes(), "This is test 4".getBytes(), "This is test 5".getBytes(), "This is test 6".getBytes(), "This is test 7".getBytes(), "This is test 8".getBytes(), "This is test 9".getBytes()});
        MQTT client = new MQTT();
        client.setHost("tcp://localhost:" + this.port);
        BlockingConnection publishConnection = client.blockingConnection();
        publishConnection.connect();
        Thread publisherThread = new Thread(() -> {
            try {
                LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
                boolean pipelineConnected = false;
                while (!pipelineConnected) {
                    Thread.sleep(1000L);
                    for (Connection connection : this.brokerService.getBroker().getClients()) {
                        if (!connection.getConnectionId().startsWith("READ_PIPELINE")) continue;
                        pipelineConnected = true;
                    }
                }
                for (int i = 0; i < 10; ++i) {
                    publishConnection.publish("READ_TOPIC", ("This is test " + i).getBytes(), QoS.EXACTLY_ONCE, false);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        publisherThread.start();
        this.pipeline.run();
        publisherThread.join();
        publishConnection.disconnect();
    }

    @Test(timeout=30000L)
    public void testReceiveWithTimeoutAndNoData() throws Exception {
        this.pipeline.apply((PTransform)MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + this.port), (String)"READ_TOPIC", (String)"READ_PIPELINE")).withMaxReadTime(Duration.standardSeconds((long)2L)));
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws Exception {
        int i;
        int numberOfTestMessages = 200;
        MQTT client = new MQTT();
        client.setHost("tcp://localhost:" + this.port);
        BlockingConnection connection = client.blockingConnection();
        connection.connect();
        connection.subscribe(new Topic[]{new Topic(Buffer.utf8((String)"WRITE_TOPIC"), QoS.EXACTLY_ONCE)});
        ConcurrentSkipListSet messages = new ConcurrentSkipListSet();
        Thread subscriber = new Thread(() -> {
            try {
                for (int i = 0; i < 200; ++i) {
                    Message message = connection.receive();
                    messages.add(new String(message.getPayload()));
                    message.ack();
                }
            }
            catch (Exception e) {
                LOG.error("Can't receive message", (Throwable)e);
            }
        });
        subscriber.start();
        ArrayList<byte[]> data = new ArrayList<byte[]>();
        for (i = 0; i < 200; ++i) {
            data.add(("Test " + i).getBytes());
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)MqttIO.write().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + this.port), (String)"WRITE_TOPIC")));
        this.pipeline.run();
        subscriber.join();
        connection.disconnect();
        Assert.assertEquals((long)200L, (long)messages.size());
        for (i = 0; i < 200; ++i) {
            Assert.assertTrue((boolean)messages.contains("Test " + i));
        }
    }

    @After
    public void stopBroker() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
            this.brokerService = null;
        }
    }
}

