package org.apache.beam.sdk.io.mqtt;

import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/mqtt/MqttIOTest.class */
public class MqttIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);
    private BrokerService brokerService;
    private int port;

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @Before
    public void startBroker() throws Exception {
        LOG.info("Finding free network port");
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            this.port = serverSocket.getLocalPort();
            if (0 != 0) {
                try {
                    serverSocket.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            } else {
                serverSocket.close();
            }
            LOG.info("Starting ActiveMQ brokerService on {}", Integer.valueOf(this.port));
            this.brokerService = new BrokerService();
            this.brokerService.setDeleteAllMessagesOnStartup(true);
            this.brokerService.setPersistent(false);
            this.brokerService.addConnector("mqtt://localhost:" + this.port);
            this.brokerService.start();
            this.brokerService.waitUntilStarted();
        } catch (Throwable th3) {
            if (0 != 0) {
                try {
                    serverSocket.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                serverSocket.close();
            }
            throw th3;
        }
    }

    /* JADX WARN: Type inference failed for: r1v8, types: [byte[], java.lang.Object[]] */
    @Test(timeout = 60000)
    @Ignore("https://issues.apache.org/jira/browse/BEAM-3604 Test timeout failure.")
    public void testReadNoClientId() throws Exception {
        PAssert.that(this.pipeline.apply(MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("tcp://localhost:" + this.port, "READ_TOPIC_NO_CLIENT_ID")).withMaxNumRecords(10L))).containsInAnyOrder((Object[]) new byte[]{"This is test 0".getBytes(StandardCharsets.UTF_8), "This is test 1".getBytes(StandardCharsets.UTF_8), "This is test 2".getBytes(StandardCharsets.UTF_8), "This is test 3".getBytes(StandardCharsets.UTF_8), "This is test 4".getBytes(StandardCharsets.UTF_8), "This is test 5".getBytes(StandardCharsets.UTF_8), "This is test 6".getBytes(StandardCharsets.UTF_8), "This is test 7".getBytes(StandardCharsets.UTF_8), "This is test 8".getBytes(StandardCharsets.UTF_8), "This is test 9".getBytes(StandardCharsets.UTF_8)});
        MQTT mqtt = new MQTT();
        mqtt.setHost("tcp://localhost:" + this.port);
        BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        Thread thread = new Thread(() -> {
            try {
                LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
                boolean z = false;
                while (!z) {
                    Thread.sleep(1000L);
                    for (Connection connection : this.brokerService.getBroker().getClients()) {
                        if (!connection.getConnectionId().isEmpty()) {
                            z = true;
                        }
                    }
                }
                for (int i = 0; i < 10; i++) {
                    blockingConnection.publish("READ_TOPIC_NO_CLIENT_ID", ("This is test " + i).getBytes(StandardCharsets.UTF_8), QoS.EXACTLY_ONCE, false);
                }
            } catch (Exception e) {
            }
        });
        thread.start();
        this.pipeline.run();
        blockingConnection.disconnect();
        thread.join();
    }

    /* JADX WARN: Type inference failed for: r1v4, types: [byte[], java.lang.Object[]] */
    @Test(timeout = 30000)
    @Ignore("https://issues.apache.org/jira/browse/BEAM-5150 Flake Non-deterministic output.")
    public void testRead() throws Exception {
        PAssert.that(this.pipeline.apply(MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("tcp://localhost:" + this.port, "READ_TOPIC", "READ_PIPELINE")).withMaxReadTime(Duration.standardSeconds(3L)))).containsInAnyOrder((Object[]) new byte[]{"This is test 0".getBytes(StandardCharsets.UTF_8), "This is test 1".getBytes(StandardCharsets.UTF_8), "This is test 2".getBytes(StandardCharsets.UTF_8), "This is test 3".getBytes(StandardCharsets.UTF_8), "This is test 4".getBytes(StandardCharsets.UTF_8), "This is test 5".getBytes(StandardCharsets.UTF_8), "This is test 6".getBytes(StandardCharsets.UTF_8), "This is test 7".getBytes(StandardCharsets.UTF_8), "This is test 8".getBytes(StandardCharsets.UTF_8), "This is test 9".getBytes(StandardCharsets.UTF_8)});
        MQTT mqtt = new MQTT();
        mqtt.setHost("tcp://localhost:" + this.port);
        BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        Thread thread = new Thread(() -> {
            try {
                LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
                boolean z = false;
                while (!z) {
                    Thread.sleep(1000L);
                    for (Connection connection : this.brokerService.getBroker().getClients()) {
                        if (connection.getConnectionId().startsWith("READ_PIPELINE")) {
                            z = true;
                        }
                    }
                }
                for (int i = 0; i < 10; i++) {
                    blockingConnection.publish("READ_TOPIC", ("This is test " + i).getBytes(StandardCharsets.UTF_8), QoS.EXACTLY_ONCE, false);
                }
            } catch (Exception e) {
            }
        });
        thread.start();
        this.pipeline.run();
        thread.join();
        blockingConnection.disconnect();
    }

    @Test(timeout = 30000)
    public void testReceiveWithTimeoutAndNoData() throws Exception {
        this.pipeline.apply(MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("tcp://localhost:" + this.port, "READ_TOPIC", "READ_PIPELINE")).withMaxReadTime(Duration.standardSeconds(2L)));
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost("tcp://localhost:" + this.port);
        BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        blockingConnection.subscribe(new Topic[]{new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.EXACTLY_ONCE)});
        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 200; i++) {
                try {
                    Message receive = blockingConnection.receive();
                    concurrentSkipListSet.add(new String(receive.getPayload(), StandardCharsets.UTF_8));
                    receive.ack();
                } catch (Exception e) {
                    LOG.error("Can't receive message", e);
                    return;
                }
            }
        });
        thread.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 200; i++) {
            arrayList.add(("Test " + i).getBytes(StandardCharsets.UTF_8));
        }
        this.pipeline.apply(Create.of(arrayList)).apply(MqttIO.write().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("tcp://localhost:" + this.port, "WRITE_TOPIC")));
        this.pipeline.run();
        thread.join();
        blockingConnection.disconnect();
        Assert.assertEquals(200L, concurrentSkipListSet.size());
        for (int i2 = 0; i2 < 200; i2++) {
            Assert.assertTrue(concurrentSkipListSet.contains("Test " + i2));
        }
    }

    @After
    public void stopBroker() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
            this.brokerService = null;
        }
    }
}
