package org.apache.beam.sdk.io.mqtt;

import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIOTest.class */
public class MqttIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);
    private static transient BrokerService brokerService;
    private static int port;

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Before
    public void startBroker() throws Exception {
        LOG.info("Finding free network port");
        ServerSocket serverSocket = new ServerSocket(0);
        port = serverSocket.getLocalPort();
        serverSocket.close();
        LOG.info("Starting ActiveMQ brokerService on {}", Integer.valueOf(port));
        brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setPersistent(false);
        brokerService.addConnector("mqtt://localhost:" + port);
        brokerService.start();
        brokerService.waitUntilStarted();
    }

    /* JADX WARN: Type inference failed for: r1v4, types: [byte[], java.lang.Object[]] */
    @Test(timeout = 60000)
    @Category({RunnableOnService.class})
    public void testRead() throws Exception {
        PAssert.that(this.pipeline.apply(MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "READ_TOPIC", "READ_PIPELINE")).withMaxNumRecords(10L))).containsInAnyOrder((Object[]) new byte[]{"This is test 0".getBytes(), "This is test 1".getBytes(), "This is test 2".getBytes(), "This is test 3".getBytes(), "This is test 4".getBytes(), "This is test 5".getBytes(), "This is test 6".getBytes(), "This is test 7".getBytes(), "This is test 8".getBytes(), "This is test 9".getBytes()});
        MQTT mqtt = new MQTT();
        mqtt.setHost("tcp://localhost:" + port);
        final BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        Thread thread = new Thread() { // from class: org.apache.beam.sdk.io.mqtt.MqttIOTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    MqttIOTest.LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
                    boolean z = false;
                    while (!z) {
                        Thread.sleep(1000L);
                        for (Connection connection : MqttIOTest.brokerService.getBroker().getClients()) {
                            if (connection.getConnectionId().startsWith("READ_PIPELINE")) {
                                z = true;
                            }
                        }
                    }
                    for (int i = 0; i < 10; i++) {
                        blockingConnection.publish("READ_TOPIC", ("This is test " + i).getBytes(), QoS.AT_LEAST_ONCE, false);
                    }
                } catch (Exception e) {
                }
            }
        };
        thread.start();
        this.pipeline.run();
        blockingConnection.disconnect();
        thread.join();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWrite() throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost("tcp://localhost:" + port);
        final BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        blockingConnection.subscribe(new Topic[]{new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.AT_LEAST_ONCE)});
        final HashSet hashSet = new HashSet();
        Thread thread = new Thread() { // from class: org.apache.beam.sdk.io.mqtt.MqttIOTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                for (int i = 0; i < 200; i++) {
                    try {
                        Message receive = blockingConnection.receive();
                        hashSet.add(new String(receive.getPayload()));
                        receive.ack();
                    } catch (Exception e) {
                        MqttIOTest.LOG.error("Can't receive message", e);
                        return;
                    }
                }
            }
        };
        thread.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 200; i++) {
            arrayList.add(("Test " + i).getBytes());
        }
        this.pipeline.apply(Create.of(arrayList)).apply(MqttIO.write().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "WRITE_TOPIC")));
        this.pipeline.run();
        thread.join();
        blockingConnection.disconnect();
        Assert.assertEquals(200L, hashSet.size());
        for (int i2 = 0; i2 < 200; i2++) {
            Assert.assertTrue(hashSet.contains("Test " + i2));
        }
    }

    @After
    public void stopBroker() throws Exception {
        if (brokerService != null) {
            brokerService.stop();
            brokerService.waitUntilStopped();
            brokerService = null;
        }
    }
}
