/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mqtt;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.mqtt.MqttIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class MqttIOTest {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);
    private @UnknownKeyFor @NonNull @Initialized BrokerService brokerService;
    private @UnknownKeyFor @NonNull @Initialized int port;
    @Rule
    public final transient @UnknownKeyFor @NonNull @Initialized TestPipeline pipeline = TestPipeline.create();

    @Before
    public void startBroker() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.port = NetworkTestHelper.getAvailableLocalPort();
        LOG.info("Starting ActiveMQ brokerService on {}", (Object)this.port);
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setPersistent(false);
        this.brokerService.addConnector("mqtt://localhost:" + this.port);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
    }

    @Test(timeout=60000L)
    @Ignore(value="https://issues.apache.org/jira/browse/BEAM-3604 Test timeout failure.")
    public void testReadNoClientId() throws @UnknownKeyFor @NonNull @Initialized Exception {
        String topicName = "READ_TOPIC_NO_CLIENT_ID";
        MqttIO.Read mqttReader = MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + this.port), (String)"READ_TOPIC_NO_CLIENT_ID")).withMaxNumRecords(10L);
        PCollection output = (PCollection)this.pipeline.apply((PTransform)mqttReader);
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new byte[][]{"This is test 0".getBytes(StandardCharsets.UTF_8), "This is test 1".getBytes(StandardCharsets.UTF_8), "This is test 2".getBytes(StandardCharsets.UTF_8), "This is test 3".getBytes(StandardCharsets.UTF_8), "This is test 4".getBytes(StandardCharsets.UTF_8), "This is test 5".getBytes(StandardCharsets.UTF_8), "This is test 6".getBytes(StandardCharsets.UTF_8), "This is test 7".getBytes(StandardCharsets.UTF_8), "This is test 8".getBytes(StandardCharsets.UTF_8), "This is test 9".getBytes(StandardCharsets.UTF_8)});
        MQTT client = new MQTT();
        client.setHost("tcp://localhost:" + this.port);
        BlockingConnection publishConnection = client.blockingConnection();
        publishConnection.connect();
        Thread publisherThread = new Thread(() -> {
            try {
                LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
                boolean pipelineConnected = false;
                while (!pipelineConnected) {
                    Thread.sleep(1000L);
                    for (Connection connection : this.brokerService.getBroker().getClients()) {
                        if (connection.getConnectionId().isEmpty()) continue;
                        pipelineConnected = true;
                    }
                }
                for (int i = 0; i < 10; ++i) {
                    publishConnection.publish("READ_TOPIC_NO_CLIENT_ID", ("This is test " + i).getBytes(StandardCharsets.UTF_8), QoS.EXACTLY_ONCE, false);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        publisherThread.start();
        this.pipeline.run();
        publishConnection.disconnect();
        publisherThread.join();
    }

    @Test(timeout=30000L)
    @Ignore(value="https://issues.apache.org/jira/browse/BEAM-5150 Flake Non-deterministic output.")
    public void testRead() throws @UnknownKeyFor @NonNull @Initialized Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + this.port), (String)"READ_TOPIC").withClientId("READ_PIPELINE")).withMaxReadTime(Duration.standardSeconds((long)3L)));
        PAssert.that((PCollection)output).containsInAnyOrder((Object[])new byte[][]{"This is test 0".getBytes(StandardCharsets.UTF_8), "This is test 1".getBytes(StandardCharsets.UTF_8), "This is test 2".getBytes(StandardCharsets.UTF_8), "This is test 3".getBytes(StandardCharsets.UTF_8), "This is test 4".getBytes(StandardCharsets.UTF_8), "This is test 5".getBytes(StandardCharsets.UTF_8), "This is test 6".getBytes(StandardCharsets.UTF_8), "This is test 7".getBytes(StandardCharsets.UTF_8), "This is test 8".getBytes(StandardCharsets.UTF_8), "This is test 9".getBytes(StandardCharsets.UTF_8)});
        MQTT client = new MQTT();
        client.setHost("tcp://localhost:" + this.port);
        BlockingConnection publishConnection = client.blockingConnection();
        publishConnection.connect();
        Thread publisherThread = new Thread(() -> {
            try {
                LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
                boolean pipelineConnected = false;
                while (!pipelineConnected) {
                    Thread.sleep(1000L);
                    for (Connection connection : this.brokerService.getBroker().getClients()) {
                        if (!connection.getConnectionId().startsWith("READ_PIPELINE")) continue;
                        pipelineConnected = true;
                    }
                }
                for (int i = 0; i < 10; ++i) {
                    publishConnection.publish("READ_TOPIC", ("This is test " + i).getBytes(StandardCharsets.UTF_8), QoS.EXACTLY_ONCE, false);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        publisherThread.start();
        this.pipeline.run();
        publisherThread.join();
        publishConnection.disconnect();
    }

    @Test(timeout=30000L)
    public void testReceiveWithTimeoutAndNoData() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.pipeline.apply((PTransform)MqttIO.read().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + this.port), (String)"READ_TOPIC").withClientId("READ_PIPELINE")).withMaxReadTime(Duration.standardSeconds((long)2L)));
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws @UnknownKeyFor @NonNull @Initialized Exception {
        int i;
        int numberOfTestMessages = 200;
        MQTT client = new MQTT();
        client.setHost("tcp://localhost:" + this.port);
        BlockingConnection connection = client.blockingConnection();
        connection.connect();
        connection.subscribe(new Topic[]{new Topic(Buffer.utf8((String)"WRITE_TOPIC"), QoS.EXACTLY_ONCE)});
        ConcurrentSkipListSet messages = new ConcurrentSkipListSet();
        Thread subscriber = new Thread(() -> {
            try {
                for (int i = 0; i < 200; ++i) {
                    Message message = connection.receive();
                    messages.add(new String(message.getPayload(), StandardCharsets.UTF_8));
                    message.ack();
                }
            }
            catch (Exception e) {
                LOG.error("Can't receive message", (Throwable)e);
            }
        });
        subscriber.start();
        ArrayList<byte[]> data = new ArrayList<byte[]>();
        for (i = 0; i < 200; ++i) {
            data.add(("Test " + i).getBytes(StandardCharsets.UTF_8));
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)MqttIO.write().withConnectionConfiguration(MqttIO.ConnectionConfiguration.create((String)("tcp://localhost:" + this.port), (String)"WRITE_TOPIC")));
        this.pipeline.run();
        subscriber.join();
        connection.disconnect();
        Assert.assertEquals((long)200L, (long)messages.size());
        for (i = 0; i < 200; ++i) {
            Assert.assertTrue((boolean)messages.contains("Test " + i));
        }
    }

    @Test
    public void testReadObject() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream out = new ObjectOutputStream(bos);
        MqttIO.MqttCheckpointMark cp1 = new MqttIO.MqttCheckpointMark(UUID.randomUUID().toString());
        out.writeObject(cp1);
        ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
        ObjectInputStream in = new ObjectInputStream(bis);
        MqttIO.MqttCheckpointMark cp2 = (MqttIO.MqttCheckpointMark)in.readObject();
        Assert.assertEquals((long)0L, (long)in.available());
        Assert.assertEquals((long)0L, (long)cp2.messages.size());
        Assert.assertEquals((Object)cp1.clientId, (Object)cp2.clientId);
        Assert.assertEquals((Object)cp1.oldestMessageTimestamp, (Object)cp2.oldestMessageTimestamp);
    }

    @After
    public void stopBroker() throws @UnknownKeyFor @NonNull @Initialized Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
            this.brokerService = null;
        }
    }
}

