package org.apache.beam.sdk.io.amqp;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.messenger.Messenger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIOTest.class */
public class AmqpIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpIOTest.class);

    @Rule
    public TestPipeline pipeline = TestPipeline.create();

    @Rule
    public EmbeddedAmqpBroker broker = new EmbeddedAmqpBroker();

    /* loaded from: input_file:org/apache/beam/sdk/io/amqp/AmqpIOTest$EmbeddedAmqpBroker.class */
    private static class EmbeddedAmqpBroker extends EmbeddedActiveMQBroker {
        private EmbeddedAmqpBroker() {
        }

        protected void configure() {
            try {
                getBrokerService().addConnector("amqp://localhost:0");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public String getQueueUri(String str) {
            return getBrokerService().getDefaultSocketURIString() + "/" + str;
        }
    }

    @Test
    public void testRead() throws Exception {
        PAssert.thatSingleton(this.pipeline.apply(AmqpIO.read().withMaxNumRecords(100L).withAddresses(Collections.singletonList(this.broker.getQueueUri("testRead")))).apply(Count.globally())).isEqualTo(100L);
        Messenger create = Messenger.Factory.create();
        create.start();
        for (int i = 0; i < 100; i++) {
            Message create2 = Message.Factory.create();
            create2.setAddress(this.broker.getQueueUri("testRead"));
            create2.setBody(new AmqpValue("Test " + i));
            create.put(create2);
            create.send();
        }
        create.stop();
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            Message create = Message.Factory.create();
            create.setBody(new AmqpValue("Test " + i));
            create.setAddress(this.broker.getQueueUri("testWrite"));
            create.setSubject("test");
            arrayList.add(create);
        }
        this.pipeline.apply(Create.of(arrayList).withCoder(AmqpMessageCoder.of())).apply(AmqpIO.write());
        this.pipeline.run().waitUntilFinish();
        ArrayList arrayList2 = new ArrayList();
        Messenger create2 = Messenger.Factory.create();
        create2.start();
        create2.subscribe(this.broker.getQueueUri("testWrite"));
        while (arrayList2.size() < 100) {
            create2.recv();
            while (create2.incoming() > 0) {
                Message message = create2.get();
                LOG.info("Received: " + message.getBody().toString());
                arrayList2.add(message.getBody().toString());
            }
        }
        create2.stop();
        Assert.assertEquals(100L, arrayList2.size());
        for (int i2 = 0; i2 < 100; i2++) {
            Assert.assertTrue(arrayList2.contains("AmqpValue{Test " + i2 + "}"));
        }
    }
}
