/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.amqp;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.amqp.AmqpIO;
import org.apache.beam.sdk.io.amqp.AmqpMessageCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class AmqpIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class);
    @Rule
    public TestPipeline pipeline = TestPipeline.create();
    @Rule
    public EmbeddedAmqpBroker broker = new EmbeddedAmqpBroker();

    @Test
    public void testRead() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)AmqpIO.read().withMaxNumRecords(100L).withAddresses(Collections.singletonList(this.broker.getQueueUri("testRead"))));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply(Count.globally()))).isEqualTo((Object)100L);
        Messenger sender = Messenger.Factory.create();
        sender.start();
        for (int i = 0; i < 100; ++i) {
            Message message = Message.Factory.create();
            message.setAddress(this.broker.getQueueUri("testRead"));
            message.setBody((Section)new AmqpValue((Object)("Test " + i)));
            sender.put(message);
            sender.send();
        }
        sender.stop();
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws Exception {
        ArrayList<Message> data = new ArrayList<Message>();
        for (int i = 0; i < 100; ++i) {
            Message message = Message.Factory.create();
            message.setBody((Section)new AmqpValue((Object)("Test " + i)));
            message.setAddress(this.broker.getQueueUri("testWrite"));
            message.setSubject("test");
            data.add(message);
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data).withCoder((Coder)AmqpMessageCoder.of()))).apply((PTransform)AmqpIO.write());
        this.pipeline.run().waitUntilFinish();
        ArrayList<String> received = new ArrayList<String>();
        Messenger messenger = Messenger.Factory.create();
        messenger.start();
        messenger.subscribe(this.broker.getQueueUri("testWrite"));
        while (received.size() < 100) {
            messenger.recv();
            while (messenger.incoming() > 0) {
                Message message = messenger.get();
                LOG.info("Received: " + message.getBody().toString());
                received.add(message.getBody().toString());
            }
        }
        messenger.stop();
        Assert.assertEquals((long)100L, (long)received.size());
        for (int i = 0; i < 100; ++i) {
            Assert.assertTrue((boolean)received.contains("AmqpValue{Test " + i + "}"));
        }
    }

    private static class EmbeddedAmqpBroker
    extends EmbeddedActiveMQBroker {
        private EmbeddedAmqpBroker() {
        }

        protected void configure() {
            try {
                this.getBrokerService().addConnector("amqp://localhost:0");
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public String getQueueUri(String queueName) {
            return this.getBrokerService().getDefaultSocketURIString() + "/" + queueName;
        }
    }
}

