package org.apache.beam.sdk.io.jms;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.Callback;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.jms.CommonJms;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.qpid.jms.JmsAcknowledgeCallback;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.StringContains;
import org.hamcrest.object.HasToString;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOTest.class */
public class JmsIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(JmsIOTest.class);
    private final RetryConfiguration retryConfiguration = RetryConfiguration.create(1, Duration.standardSeconds(1), (Duration) null);

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private final CommonJms commonJms;
    private ConnectionFactory connectionFactory;
    private Class<? extends ConnectionFactory> connectionFactoryClass;
    private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOTest$TestEvent.class */
    public static class TestEvent implements Serializable {
        private final String topicName;
        private final String value;

        private TestEvent(String str, String str2) {
            this.topicName = str;
            this.value = str2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getTopicName() {
            return this.topicName;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getValue() {
            return this.value;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOTest$TextMessageMapperWithError.class */
    private static class TextMessageMapperWithError implements SerializableBiFunction<String, Session, Message> {
        private TextMessageMapperWithError() {
        }

        public Message apply(String str, Session session) {
            try {
                if (str.equals("Message 1") || str.equals("Message 2")) {
                    throw new JMSException("Error!!");
                }
                TextMessage createTextMessage = session.createTextMessage();
                createTextMessage.setText(str);
                return createTextMessage;
            } catch (JMSException e) {
                throw new JmsIOException("Error creating TextMessage", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOTest$TextMessageMapperWithErrorAndCounter.class */
    private static class TextMessageMapperWithErrorAndCounter implements SerializableBiFunction<String, Session, Message> {
        private static int errorCounter = 0;

        private TextMessageMapperWithErrorAndCounter() {
        }

        public Message apply(String str, Session session) {
            try {
                if (!str.equals("Message 1") && !str.equals("Message 2")) {
                    TextMessage createTextMessage = session.createTextMessage();
                    createTextMessage.setText(str);
                    return createTextMessage;
                }
                if (errorCounter == 0 || !str.equals("Message 1")) {
                    errorCounter++;
                    throw new JMSException("Error!!");
                }
                TextMessage createTextMessage2 = session.createTextMessage();
                createTextMessage2.setText(str);
                return createTextMessage2;
            } catch (JMSException e) {
                throw new JmsIOException("Error creating TextMessage", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOTest$TextMessageMapperWithErrorCounter.class */
    private static class TextMessageMapperWithErrorCounter implements SerializableBiFunction<String, Session, Message> {
        private static int errorCounter;

        TextMessageMapperWithErrorCounter() {
            errorCounter = 0;
        }

        public Message apply(String str, Session session) {
            try {
                if (errorCounter == 0) {
                    errorCounter++;
                    throw new JMSException("Error!!");
                }
                TextMessage createTextMessage = session.createTextMessage();
                createTextMessage.setText(str);
                return createTextMessage;
            } catch (JMSException e) {
                throw new JmsIOException("Error creating TextMessage", e);
            }
        }
    }

    @Parameterized.Parameters(name = "with client class {3}")
    public static Collection<Object[]> connectionFactories() {
        return Arrays.asList(new Object[]{"vm://localhost", 5672, "jms.sendAcksAsync=false", ActiveMQConnectionFactory.class}, new Object[]{"amqp://localhost", 5672, "jms.forceAsyncAcks=false", JmsConnectionFactory.class});
    }

    public JmsIOTest(String str, Integer num, String str2, Class<? extends ConnectionFactory> cls) {
        this.commonJms = new CommonJms(str, num, str2, cls);
    }

    @Before
    public void beforeEeach() throws Exception {
        this.commonJms.startBroker();
        this.connectionFactory = this.commonJms.getConnectionFactory();
        this.connectionFactoryClass = this.commonJms.getConnectionFactoryClass();
        this.connectionFactoryWithSyncAcksAndWithoutPrefetch = this.commonJms.getConnectionFactoryWithSyncAcksAndWithoutPrefetch();
    }

    @After
    public void tearDown() throws Exception {
        this.commonJms.stopBroker();
        this.connectionFactory = null;
        this.connectionFactoryClass = null;
        this.connectionFactoryWithSyncAcksAndWithoutPrefetch = null;
    }

    private void runPipelineExpectingJmsConnectException(String str) {
        try {
            this.pipeline.run();
            Assert.fail();
        } catch (Exception e) {
            MatcherAssert.assertThat(Throwables.getRootCause(e).getMessage(), StringContains.containsString(str));
        }
    }

    @Test
    public void testAuthenticationRequired() {
        this.pipeline.apply(JmsIO.read().withConnectionFactory(this.connectionFactory).withQueue("test_queue"));
        runPipelineExpectingJmsConnectException(this.connectionFactoryClass == ActiveMQConnectionFactory.class ? "User name [null] or password is invalid." : "Client failed to authenticate using SASL: ANONYMOUS");
    }

    @Test
    public void testAuthenticationWithBadPassword() {
        this.pipeline.apply(JmsIO.read().withConnectionFactory(this.connectionFactory).withQueue("test_queue").withUsername("test_user").withPassword("BAD"));
        runPipelineExpectingJmsConnectException(this.connectionFactoryClass == ActiveMQConnectionFactory.class ? "User name [test_user] or password is invalid." : "Client failed to authenticate using SASL: PLAIN");
    }

    @Test
    public void testReadMessages() throws Exception {
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("test_queue"));
        TextMessage createTextMessage = createSession.createTextMessage("This Is A Test");
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.send(createTextMessage);
        createProducer.close();
        createSession.close();
        createConnection.close();
        PAssert.thatSingleton(this.pipeline.apply(JmsIO.read().withConnectionFactory(this.connectionFactory).withQueue("test_queue").withUsername("test_user").withPassword("test_password").withMaxNumRecords(5L)).apply("Count", Count.globally())).isEqualTo(5L);
        this.pipeline.run();
        Session createSession2 = this.connectionFactory.createConnection("test_user", "test_password").createSession(false, 1);
        Assert.assertNull(createSession2.createConsumer(createSession2.createQueue("test_queue")).receiveNoWait());
    }

    @Test
    public void testReadBytesMessages() throws Exception {
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("test_queue"));
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes("This Is A Test".getBytes(StandardCharsets.UTF_8));
        createProducer.send(createBytesMessage);
        createProducer.close();
        createSession.close();
        createConnection.close();
        PAssert.thatSingleton(this.pipeline.apply(JmsIO.readMessage().withConnectionFactory(this.connectionFactory).withQueue("test_queue").withUsername("test_user").withPassword("test_password").withMaxNumRecords(1L).withCoder(SerializableCoder.of(String.class)).withMessageMapper(new CommonJms.BytesMessageToStringMessageMapper())).apply("Count", Count.globally())).isEqualTo(1L);
        this.pipeline.run();
        Session createSession2 = this.connectionFactory.createConnection("test_user", "test_password").createSession(false, 1);
        Assert.assertNull(createSession2.createConsumer(createSession2.createQueue("test_queue")).receiveNoWait());
    }

    @Test
    public void testWriteMessage() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add("Message " + i);
        }
        this.pipeline.apply(Create.of(arrayList)).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withValueMapper(new TextMessageMapper()).withRetryConfiguration(this.retryConfiguration).withQueue("test_queue").withUsername("test_user").withPassword("test_password"));
        this.pipeline.run();
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        int i2 = 0;
        while (createSession.createConsumer(createSession.createQueue("test_queue")).receive(1000L) != null) {
            i2++;
        }
        Assert.assertEquals(100L, i2);
    }

    @Test
    public void testWriteMessageWithError() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add("Message " + i);
        }
        PAssert.that(this.pipeline.apply(Create.of(arrayList)).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withValueMapper(new TextMessageMapperWithError()).withRetryConfiguration(this.retryConfiguration).withQueue("test_queue").withUsername("test_user").withPassword("test_password")).getFailedMessages()).containsInAnyOrder(new String[]{"Message 1", "Message 2"});
        this.pipeline.run();
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        int i2 = 0;
        while (createSession.createConsumer(createSession.createQueue("test_queue")).receive(1000L) != null) {
            i2++;
        }
        Assert.assertEquals(98L, i2);
    }

    @Test
    public void testWriteDynamicMessage() throws Exception {
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("Topic_One"));
        MessageConsumer createConsumer2 = createSession.createConsumer(createSession.createTopic("Topic_Two"));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 50; i++) {
            arrayList.add(new TestEvent("Topic_One", "Message One " + i));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            arrayList.add(new TestEvent("Topic_Two", "Message Two " + i2));
        }
        this.pipeline.apply(Create.of(arrayList)).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withUsername("test_user").withPassword("test_password").withRetryConfiguration(this.retryConfiguration).withTopicNameMapper(testEvent -> {
            return testEvent.getTopicName();
        }).withValueMapper((testEvent2, session) -> {
            try {
                TextMessage createTextMessage = session.createTextMessage();
                createTextMessage.setText(testEvent2.getValue());
                return createTextMessage;
            } catch (JMSException e) {
                throw new JmsIOException("Error writing TextMessage", e);
            }
        }));
        this.pipeline.run();
        int i3 = 0;
        while (createConsumer.receive(1000L) != null) {
            i3++;
        }
        Assert.assertEquals(50L, i3);
        int i4 = 0;
        while (createConsumer2.receive(1000L) != null) {
            i4++;
        }
        Assert.assertEquals(100L, i4);
    }

    @Test
    public void testSplitForQueue() throws Exception {
        JmsIO.Read withQueue = JmsIO.read().withQueue("test_queue");
        Assert.assertEquals(5, new JmsIO.UnboundedJmsSource(withQueue).split(5, PipelineOptionsFactory.create()).size());
    }

    @Test
    public void testSplitForTopic() throws Exception {
        JmsIO.Read withTopic = JmsIO.read().withTopic("test_topic");
        Assert.assertEquals(1L, new JmsIO.UnboundedJmsSource(withTopic).split(5, PipelineOptionsFactory.create()).size());
    }

    @Test
    public void testCheckpointMark() throws Exception {
        Connection createConnection = this.connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("test_queue"));
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("test " + i));
        }
        createProducer.close();
        createSession.close();
        createConnection.close();
        JmsIO.UnboundedJmsReader createReader = new JmsIO.UnboundedJmsSource(JmsIO.read().withConnectionFactory(this.connectionFactoryWithSyncAcksAndWithoutPrefetch).withUsername("test_user").withPassword("test_password").withQueue("test_queue")).createReader((PipelineOptions) null, (JmsCheckpointMark) null);
        Assert.assertTrue(createReader.start());
        for (int i2 = 0; i2 < 3; i2++) {
            Assert.assertTrue(createReader.advance());
        }
        Assert.assertEquals(10L, count("test_queue"));
        createReader.getCheckpointMark().finalizeCheckpoint();
        Assert.assertEquals(6L, count("test_queue"));
        for (int i3 = 0; i3 < 6; i3++) {
            Assert.assertTrue(createReader.advance());
        }
        Assert.assertEquals(6L, count("test_queue"));
        createReader.getCheckpointMark().finalizeCheckpoint();
        Assert.assertEquals(0L, count("test_queue"));
    }

    private Function<?, ?> getJmsMessageAck(Class cls) {
        return cls == JmsConnectionFactory.class ? jmsTextMessage -> {
            JmsAcknowledgeCallback acknowledgeCallback = jmsTextMessage.getAcknowledgeCallback();
            JmsAcknowledgeCallback jmsAcknowledgeCallback = (JmsAcknowledgeCallback) Mockito.mock(JmsAcknowledgeCallback.class);
            try {
                ((JmsAcknowledgeCallback) Mockito.doAnswer(invocationOnMock -> {
                    Thread.sleep(10L);
                    acknowledgeCallback.acknowledge();
                    return null;
                }).when(jmsAcknowledgeCallback)).acknowledge();
            } catch (JMSException e) {
                LOG.error("An exception occurred while adding 10s delay", e);
            }
            jmsTextMessage.setAcknowledgeCallback(jmsAcknowledgeCallback);
            return jmsTextMessage;
        } : activeMQMessage -> {
            Callback acknowledgeCallback = activeMQMessage.getAcknowledgeCallback();
            activeMQMessage.setAcknowledgeCallback(() -> {
                Thread.sleep(10L);
                acknowledgeCallback.execute();
            });
            return activeMQMessage;
        };
    }

    @Test
    public void testCheckpointMarkSafety() throws Exception {
        Connection createConnection = this.connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 2);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("test_queue"));
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("test " + i));
        }
        createProducer.close();
        createSession.close();
        createConnection.close();
        JmsIO.UnboundedJmsReader createReader = new JmsIO.UnboundedJmsSource(JmsIO.read().withConnectionFactory(withSlowAcks(this.connectionFactoryWithSyncAcksAndWithoutPrefetch, getJmsMessageAck(this.connectionFactoryClass))).withUsername("test_user").withPassword("test_password").withQueue("test_queue")).createReader((PipelineOptions) null, (JmsCheckpointMark) null);
        Assert.assertTrue(createReader.start());
        for (int i2 = 0; i2 < 49; i2++) {
            Assert.assertTrue(createReader.advance());
        }
        Assert.assertEquals(100L, count("test_queue"));
        Thread thread = new Thread(() -> {
            for (int i3 = 0; i3 < 50; i3++) {
                try {
                    Assert.assertTrue(createReader.advance());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        thread.start();
        createReader.getCheckpointMark().finalizeCheckpoint();
        thread.join();
    }

    @Test
    public void testCheckpointMarkDefaultCoder() throws Exception {
        JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark();
        Coder checkpointMarkCoder = new JmsIO.UnboundedJmsSource((JmsIO.Read) null).getCheckpointMarkCoder();
        CoderProperties.coderSerializable(checkpointMarkCoder);
        CoderProperties.coderDecodeEncodeEqual(checkpointMarkCoder, jmsCheckpointMark);
    }

    @Test
    public void testDefaultAutoscaler() throws IOException {
        JmsIO.UnboundedJmsReader createReader = new JmsIO.UnboundedJmsSource(JmsIO.read().withConnectionFactory(this.connectionFactory).withUsername("test_user").withPassword("test_password").withQueue("test_queue")).createReader((PipelineOptions) null, (JmsCheckpointMark) null);
        createReader.start();
        Assert.assertEquals(-1L, createReader.getSplitBacklogBytes());
        Assert.assertEquals(-1L, createReader.getTotalBacklogBytes());
        createReader.close();
    }

    @Test
    public void testCustomAutoscaler() throws IOException {
        AutoScaler autoScaler = (AutoScaler) Mockito.mock(DefaultAutoscaler.class);
        Mockito.when(Long.valueOf(autoScaler.getTotalBacklogBytes())).thenReturn(1111L);
        JmsIO.UnboundedJmsReader createReader = new JmsIO.UnboundedJmsSource(JmsIO.read().withConnectionFactory(this.connectionFactory).withUsername("test_user").withPassword("test_password").withQueue("test_queue").withAutoScaler(autoScaler)).createReader((PipelineOptions) null, (JmsCheckpointMark) null);
        createReader.start();
        ((AutoScaler) Mockito.verify(autoScaler, Mockito.times(1))).start();
        Assert.assertEquals(1111L, createReader.getTotalBacklogBytes());
        ((AutoScaler) Mockito.verify(autoScaler, Mockito.times(1))).getTotalBacklogBytes();
        createReader.close();
        ((AutoScaler) Mockito.verify(autoScaler, Mockito.times(1))).stop();
    }

    @Test
    public void testCloseWithTimeout() throws IOException {
        Duration millis = Duration.millis(2000L);
        JmsIO.UnboundedJmsSource unboundedJmsSource = new JmsIO.UnboundedJmsSource(JmsIO.read().withConnectionFactory(this.connectionFactory).withUsername("test_user").withPassword("test_password").withQueue("test_queue").withCloseTimeout(millis));
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        ExecutorOptions as = PipelineOptionsFactory.as(ExecutorOptions.class);
        as.setScheduledExecutorService(scheduledExecutorService);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Runnable.class);
        Mockito.when(scheduledExecutorService.schedule((Runnable) forClass.capture(), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class))).thenReturn((Object) null);
        JmsIO.UnboundedJmsReader createReader = unboundedJmsSource.createReader(as, (JmsCheckpointMark) null);
        createReader.start();
        Assert.assertFalse(getDiscardedValue(createReader));
        createReader.close();
        Assert.assertFalse(getDiscardedValue(createReader));
        ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService)).schedule((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.eq(millis.getMillis()), (TimeUnit) ArgumentMatchers.eq(TimeUnit.MILLISECONDS));
        ((Runnable) forClass.getValue()).run();
        Assert.assertTrue(getDiscardedValue(createReader));
        Mockito.verifyNoMoreInteractions(new Object[]{scheduledExecutorService});
    }

    private boolean getDiscardedValue(JmsIO.UnboundedJmsReader unboundedJmsReader) {
        JmsCheckpointMark checkpointMark = unboundedJmsReader.getCheckpointMark();
        checkpointMark.lock.readLock().lock();
        try {
            boolean z = checkpointMark.discarded;
            checkpointMark.lock.readLock().unlock();
            return z;
        } catch (Throwable th) {
            checkpointMark.lock.readLock().unlock();
            throw th;
        }
    }

    @Test
    public void testDiscardCheckpointMark() throws Exception {
        Connection createConnection = this.connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("test_queue"));
        for (int i = 0; i < 10; i++) {
            createProducer.send(createSession.createTextMessage("test " + i));
        }
        createProducer.close();
        createSession.close();
        createConnection.close();
        JmsIO.UnboundedJmsReader createReader = new JmsIO.UnboundedJmsSource(JmsIO.read().withConnectionFactory(this.connectionFactoryWithSyncAcksAndWithoutPrefetch).withUsername("test_user").withPassword("test_password").withQueue("test_queue")).createReader((PipelineOptions) null, (JmsCheckpointMark) null);
        Assert.assertTrue(createReader.start());
        for (int i2 = 0; i2 < 3; i2++) {
            Assert.assertTrue(createReader.advance());
        }
        Assert.assertEquals(10L, count("test_queue"));
        createReader.getCheckpointMark().finalizeCheckpoint();
        Assert.assertEquals(6L, count("test_queue"));
        for (int i3 = 0; i3 < 6; i3++) {
            Assert.assertTrue(createReader.advance());
        }
        Assert.assertEquals(6L, count("test_queue"));
        createReader.getCheckpointMark().discard();
        createReader.getCheckpointMark().finalizeCheckpoint();
        Assert.assertEquals(6L, count("test_queue"));
    }

    @Test
    public void testPublisherWithRetryConfiguration() {
        Assert.assertEquals(JmsIO.write().withConnectionFactory(this.connectionFactory).withRetryConfiguration(RetryConfiguration.create(5, Duration.standardSeconds(15L), (Duration) null)).withQueue("test_queue").withUsername("test_user").withPassword("test_password").getRetryConfiguration(), RetryConfiguration.create(5, Duration.standardSeconds(15L), (Duration) null));
    }

    @Test
    public void testWriteMessageWithRetryPolicy() throws Exception {
        Instant now = Instant.now();
        PAssert.that(this.pipeline.apply(Create.of(Collections.singletonList(now.toString()))).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withValueMapper(new TextMessageMapperWithErrorCounter()).withRetryConfiguration(RetryConfiguration.create(3, Duration.standardSeconds(5), Duration.standardDays(10L))).withQueue("test_queue").withUsername("test_user").withPassword("test_password")).getFailedMessages()).empty();
        this.pipeline.run();
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("test_queue"));
        Message receive = createConsumer.receive(1000L);
        Assert.assertNotNull(receive);
        MatcherAssert.assertThat(Long.valueOf(receive.getJMSTimestamp()), CoreMatchers.allOf(new Matcher[]{Matchers.greaterThanOrEqualTo(Long.valueOf(now.toEpochMilli())), Matchers.lessThan(Long.valueOf(now.plus((TemporalAmount) java.time.Duration.ofSeconds(5 + 5)).toEpochMilli()))}));
        Assert.assertNull(createConsumer.receiveNoWait());
    }

    @Test
    public void testWriteMessageWithRetryPolicyReachesLimit() throws Exception {
        PAssert.that(this.pipeline.apply(Create.of(Collections.singletonList("text"))).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withValueMapper((str, session) -> {
            throw new JmsIOException("Error!!");
        }).withRetryConfiguration(RetryConfiguration.create(2, (Duration) null, (Duration) null)).withQueue("test_queue").withUsername("test_user").withPassword("test_password")).getFailedMessages()).containsInAnyOrder(new String[]{"text"});
        MatcherAssert.assertThat(this.pipeline.run().metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named(JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME, "publicationRetries")).build()).getCounters(), Matchers.contains(CoreMatchers.allOf(new Matcher[]{Matchers.hasProperty("attempted", Matchers.is(Long.valueOf(2))), Matchers.hasProperty("key", HasToString.hasToString(StringContains.containsString(String.format("%s:%s", JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME, "publicationRetries"))))})));
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Assert.assertNull(createSession.createConsumer(createSession.createQueue("test_queue")).receiveNoWait());
    }

    @Test
    public void testWriteMessagesWithErrors() throws Exception {
        PAssert.that(this.pipeline.apply(Create.of(Arrays.asList("Message 1", "Message 2", "Message 3", "Message 4"))).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withValueMapper(new TextMessageMapperWithErrorAndCounter()).withRetryConfiguration(RetryConfiguration.create(2, (Duration) null, (Duration) null)).withQueue("test_queue").withUsername("test_user").withPassword("test_password")).getFailedMessages()).containsInAnyOrder(new String[]{"Message 2"});
        this.pipeline.run();
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        int i = 0;
        while (createSession.createConsumer(createSession.createQueue("test_queue")).receive(1000L) != null) {
            i++;
        }
        Assert.assertEquals(3L, i);
    }

    @Test
    public void testWriteMessageToStaticTopicWithoutRetryPolicy() throws Exception {
        List singletonList = Collections.singletonList(Instant.now().toString());
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("test_topic"));
        PAssert.that(this.pipeline.apply(Create.of(singletonList)).apply(JmsIO.write().withConnectionFactory(this.connectionFactory).withValueMapper(new TextMessageMapper()).withTopic("test_topic").withUsername("test_user").withPassword("test_password")).getFailedMessages()).empty();
        this.pipeline.run();
        Assert.assertNotNull(createConsumer.receive(1000L));
        Assert.assertNull(createConsumer.receiveNoWait());
    }

    private int count(String str) throws Exception {
        Connection createConnection = this.connectionFactory.createConnection("test_user", "test_password");
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Enumeration enumeration = createSession.createBrowser(createSession.createQueue(str)).getEnumeration();
        int i = 0;
        while (enumeration.hasMoreElements()) {
            enumeration.nextElement();
            i++;
        }
        return i;
    }

    private <T extends Message> ConnectionFactory withSlowAcks(ConnectionFactory connectionFactory, Function<T, T> function) {
        return (ConnectionFactory) proxyMethod(connectionFactory, ConnectionFactory.class, "createConnection", connection -> {
            return (Connection) proxyMethod(connection, Connection.class, "createSession", session -> {
                return (Session) proxyMethod(session, Session.class, "createConsumer", messageConsumer -> {
                    return (MessageConsumer) proxyMethod(messageConsumer, MessageConsumer.class, "receiveNoWait", function);
                });
            });
        });
    }

    private <T, MethodArgT, MethodResultT> T proxyMethod(T t, Class<? super T> cls, String str, Function<MethodArgT, MethodResultT> function) {
        return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{cls}, (obj, method, objArr) -> {
            Object invoke = method.invoke(t, objArr);
            if (method.getName().equals(str)) {
                invoke = function.apply(invoke);
            }
            return invoke;
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1687718888:
                if (implMethodName.equals("lambda$testWriteDynamicMessage$43268ee4$1")) {
                    z = 2;
                    break;
                }
                break;
            case -80755473:
                if (implMethodName.equals("lambda$testWriteDynamicMessage$eba53d0$1")) {
                    z = false;
                    break;
                }
                break;
            case 800372134:
                if (implMethodName.equals("lambda$testWriteMessageWithRetryPolicyReachesLimit$f298c98c$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jms/JmsIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/jms/JmsIOTest$TestEvent;Ljavax/jms/Session;)Ljavax/jms/Message;")) {
                    return (testEvent2, session) -> {
                        try {
                            TextMessage createTextMessage = session.createTextMessage();
                            createTextMessage.setText(testEvent2.getValue());
                            return createTextMessage;
                        } catch (JMSException e) {
                            throw new JmsIOException("Error writing TextMessage", e);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jms/JmsIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljavax/jms/Session;)Ljavax/jms/Message;")) {
                    return (str, session2) -> {
                        throw new JmsIOException("Error!!");
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jms/JmsIOTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/jms/JmsIOTest$TestEvent;)Ljava/lang/String;")) {
                    return testEvent -> {
                        return testEvent.getTopicName();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
