/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mqtt;

import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);
    private static transient BrokerService brokerService;
    private static int port;

    @Before
    public void startBroker() throws Exception {
        LOG.info("Finding free network port");
        ServerSocket socket = new ServerSocket(0);
        port = socket.getLocalPort();
        socket.close();
        LOG.info("Starting ActiveMQ brokerService on {}", (Object)port);
        brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setPersistent(false);
        brokerService.addConnector("mqtt://localhost:" + port);
        brokerService.start();
        brokerService.waitUntilStarted();
    }

    @Test(timeout=60000L)
    @Category(value={RunnableOnService.class})
    public void testRead() throws Exception {
        TestPipeline pipeline = TestPipeline.create();
        PCollection output = (PCollection)pipeline.apply((PTransform)MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + port), (String)"READ_TOPIC", (String)"READ_PIPELINE")).withMaxNumRecords(10L));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new byte[][]{"This is test 0".getBytes(), "This is test 1".getBytes(), "This is test 2".getBytes(), "This is test 3".getBytes(), "This is test 4".getBytes(), "This is test 5".getBytes(), "This is test 6".getBytes(), "This is test 7".getBytes(), "This is test 8".getBytes(), "This is test 9".getBytes()});
        MQTT client = new MQTT();
        client.setHost("tcp://localhost:" + port);
        final BlockingConnection publishConnection = client.blockingConnection();
        publishConnection.connect();
        Thread publisherThread = new Thread(){

            @Override
            public void run() {
                try {
                    LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
                    boolean pipelineConnected = false;
                    while (!pipelineConnected) {
                        Thread.sleep(1000L);
                        for (Connection connection : brokerService.getBroker().getClients()) {
                            if (!connection.getConnectionId().startsWith("READ_PIPELINE")) continue;
                            pipelineConnected = true;
                        }
                    }
                    for (int i = 0; i < 10; ++i) {
                        publishConnection.publish("READ_TOPIC", ("This is test " + i).getBytes(), QoS.AT_LEAST_ONCE, false);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        };
        publisherThread.start();
        pipeline.run();
        publishConnection.disconnect();
        publisherThread.join();
    }

    @Test
    @Category(value={RunnableOnService.class})
    public void testWrite() throws Exception {
        int i;
        MQTT client = new MQTT();
        client.setHost("tcp://localhost:" + port);
        final BlockingConnection connection = client.blockingConnection();
        connection.connect();
        connection.subscribe(new Topic[]{new Topic(Buffer.utf8((String)"WRITE_TOPIC"), QoS.AT_LEAST_ONCE)});
        final HashSet messages = new HashSet();
        Thread subscriber = new Thread(){

            @Override
            public void run() {
                try {
                    for (int i = 0; i < 200; ++i) {
                        Message message = connection.receive();
                        messages.add(new String(message.getPayload()));
                        message.ack();
                    }
                }
                catch (Exception e) {
                    LOG.error("Can't receive message", (Throwable)e);
                }
            }
        };
        subscriber.start();
        TestPipeline pipeline = TestPipeline.create();
        ArrayList<byte[]> data = new ArrayList<byte[]>();
        for (i = 0; i < 200; ++i) {
            data.add(("Test " + i).getBytes());
        }
        ((PCollection)pipeline.apply((PTransform)Create.of(data))).apply((PTransform)MqttIO.write().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + port), (String)"WRITE_TOPIC")));
        pipeline.run();
        subscriber.join();
        connection.disconnect();
        Assert.assertEquals((long)200L, (long)messages.size());
        for (i = 0; i < 200; ++i) {
            Assert.assertTrue((boolean)messages.contains("Test " + i));
        }
    }

    @After
    public void stopBroker() throws Exception {
        if (brokerService != null) {
            brokerService.stop();
            brokerService.waitUntilStopped();
            brokerService = null;
        }
    }
}

