package org.apache.beam.sdk.io.jms;

import java.util.ArrayList;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdks.java.io.jms.repackaged.com.google.common.base.Throwables;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOTest.class */
public class JmsIOTest {
    private static final String BROKER_URL = "vm://localhost";
    private static final String USERNAME = "test_user";
    private static final String PASSWORD = "test_password";
    private static final String QUEUE = "test_queue";
    private static final String TOPIC = "test_topic";
    private BrokerService broker;
    private ConnectionFactory connectionFactory;
    private ConnectionFactory connectionFactoryWithoutPrefetch;

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOTest$BytesMessageToStringMessageMapper.class */
    public static class BytesMessageToStringMessageMapper implements JmsIO.MessageMapper<String> {
        /* renamed from: mapMessage, reason: merged with bridge method [inline-methods] */
        public String m0mapMessage(Message message) throws Exception {
            return new String(new byte[(int) ((BytesMessage) message).getBodyLength()]);
        }
    }

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
        this.broker.addConnector(BROKER_URL);
        this.broker.setBrokerName("localhost");
        this.broker.setPopulateJMSXUserID(true);
        this.broker.setUseAuthenticatedPrincipalForJMSXUserID(true);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
        this.broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(arrayList)});
        this.broker.start();
        this.connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        this.connectionFactoryWithoutPrefetch = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=0");
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
    }

    private void runPipelineExpectingJmsConnectException(String str) {
        try {
            this.pipeline.run();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertThat(Throwables.getRootCause(e).getMessage(), Matchers.containsString(str));
        }
    }

    @Test
    public void testAuthenticationRequired() {
        this.pipeline.apply(JmsIO.read().withConnectionFactory(this.connectionFactory).withQueue(QUEUE));
        runPipelineExpectingJmsConnectException("User name [null] or password is invalid.");
    }

    @Test
    public void testAuthenticationWithBadPassword() {
        this.pipeline.apply(JmsIO.read().withConnectionFactory(this.connectionFactory).withQueue(QUEUE).withUsername(USERNAME).withPassword("BAD"));
        runPipelineExpectingJmsConnectException("User name [test_user] or password is invalid.");
    }

    @Test
    public void testReadMessages() throws Exception {
        Connection createConnection = this.connectionFactory.createConnection(USERNAME, PASSWORD);
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(QUEUE));
        TextMessage createTextMessage = createSession.createTextMessage("This Is A Test");
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.close();
        createSession.close();
        createConnection.close();
        PAssert.thatSingleton(this.pipeline.apply(JmsIO.read().withConnectionFactory(this.connectionFactory).withQueue(QUEUE).withUsername(USERNAME).withPassword(PASSWORD).withMaxNumRecords(5L)).apply("Count", Count.globally())).isEqualTo(5L);
        this.pipeline.run();
        Session createSession2 = this.connectionFactory.createConnection(USERNAME, PASSWORD).createSession(false, 1);
        Assert.assertNull(createSession2.createConsumer(createSession2.createQueue(QUEUE)).receiveNoWait());
    }

    @Test
    public void testReadBytesMessages() throws Exception {
        Connection createConnection = this.connectionFactory.createConnection(USERNAME, PASSWORD);
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(QUEUE));
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes("This Is A Test".getBytes());
        createProducer.send(createBytesMessage);
        createProducer.close();
        createSession.close();
        createConnection.close();
        PAssert.thatSingleton(this.pipeline.apply(JmsIO.readMessage().withConnectionFactory(this.connectionFactory).withQueue(QUEUE).withUsername(USERNAME).withPassword(PASSWORD).withMaxNumRecords(1L).withCoder(SerializableCoder.of(String.class)).withMessageMapper(new BytesMessageToStringMessageMapper())).apply("Count", Count.globally())).isEqualTo(1L);
        this.pipeline.run();
        Session createSession2 = this.connectionFactory.createConnection(USERNAME, PASSWORD).createSession(false, 1);
        Assert.assertNull(createSession2.createConsumer(createSession2.createQueue(QUEUE)).receiveNoWait());
    }

    @Test
    public void testWriteMessage() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add("Message " + i);
        }
        this.pipeline.apply(Create.of(arrayList)).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withQueue(QUEUE).withUsername(USERNAME).withPassword(PASSWORD));
        this.pipeline.run();
        Connection createConnection = this.connectionFactory.createConnection(USERNAME, PASSWORD);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        int i2 = 0;
        while (createSession.createConsumer(createSession.createQueue(QUEUE)).receive(1000L) != null) {
            i2++;
        }
        Assert.assertEquals(100L, i2);
    }

    @Test
    public void testSplitForQueue() throws Exception {
        JmsIO.Read withQueue = JmsIO.read().withQueue(QUEUE);
        Assert.assertEquals(5, new JmsIO.UnboundedJmsSource(withQueue).split(5, PipelineOptionsFactory.create()).size());
    }

    @Test
    public void testSplitForTopic() throws Exception {
        JmsIO.Read withTopic = JmsIO.read().withTopic(TOPIC);
        Assert.assertEquals(1L, new JmsIO.UnboundedJmsSource(withTopic).split(5, PipelineOptionsFactory.create()).size());
    }

    @Test
    public void testCheckpointMark() throws Exception {
        Connection createConnection = this.connectionFactoryWithoutPrefetch.createConnection(USERNAME, PASSWORD);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(QUEUE));
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("test " + i));
        }
        createProducer.close();
        createSession.close();
        createConnection.close();
        JmsIO.UnboundedJmsReader createReader = new JmsIO.UnboundedJmsSource(JmsIO.read().withConnectionFactory(this.connectionFactoryWithoutPrefetch).withUsername(USERNAME).withPassword(PASSWORD).withQueue(QUEUE)).createReader((PipelineOptions) null, (JmsCheckpointMark) null);
        Assert.assertTrue(createReader.start());
        for (int i2 = 0; i2 < 3; i2++) {
            Assert.assertTrue(createReader.advance());
        }
        Assert.assertEquals(10L, count(QUEUE));
        createReader.getCheckpointMark().finalizeCheckpoint();
        Assert.assertEquals(6L, count(QUEUE));
        for (int i3 = 0; i3 < 6; i3++) {
            Assert.assertTrue(createReader.advance());
        }
        Assert.assertEquals(6L, count(QUEUE));
        createReader.getCheckpointMark().finalizeCheckpoint();
        Assert.assertEquals(0L, count(QUEUE));
    }

    private int count(String str) throws Exception {
        Connection createConnection = this.connectionFactory.createConnection(USERNAME, PASSWORD);
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Enumeration enumeration = createSession.createBrowser(createSession.createQueue(str)).getEnumeration();
        int i = 0;
        while (enumeration.hasMoreElements()) {
            enumeration.nextElement();
            i++;
        }
        return i;
    }
}
