package org.apache.beam.sdk.io.jms;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import javax.jms.ConnectionFactory;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.jms.JmsIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOIT.class */
public class JmsIOIT implements Serializable {
    private static final String READ_TIME_METRIC = "read_time";
    private static final String WRITE_TIME_METRIC = "write_time";
    private static final String READ_ELEMENT_METRIC_NAME = "jms_read_element_count";

    @Rule
    public transient TestPipeline pipelineWrite = TestPipeline.create();

    @Rule
    public transient TestPipeline pipelineRead = TestPipeline.create();
    private final CommonJms commonJms;
    private ConnectionFactory connectionFactory;
    private Class<? extends ConnectionFactory> connectionFactoryClass;
    private static final String NAMESPACE = JmsIOIT.class.getName();
    private static final JmsIOITOptions OPTIONS = IOITHelper.readIOTestPipelineOptions(JmsIOITOptions.class);
    private static final InfluxDBSettings settings = InfluxDBSettings.builder().withHost(OPTIONS.getInfluxHost()).withDatabase(OPTIONS.getInfluxDatabase()).withMeasurement(OPTIONS.getInfluxMeasurement()).get();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOIT$CountingFn.class */
    public static class CountingFn extends DoFn<String, Void> {
        private final Counter elementCounter;

        CountingFn(String str, String str2) {
            this.elementCounter = Metrics.counter(str, str2);
        }

        @DoFn.ProcessElement
        public void processElement() {
            this.elementCounter.inc(1L);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOIT$JmsIOITOptions.class */
    public interface JmsIOITOptions extends IOTestPipelineOptions, StreamingOptions {
        @Default.String("amqp://localhost")
        @Description("Host name for Jms Broker. By default, 'amqp://localhost'")
        String getJmsBrokerHost();

        void setJmsBrokerHost(String str);

        @Description("Port for Jms Broker")
        @Default.Integer(5672)
        Integer getJmsBrokerPort();

        void setJmsBrokerPort(Integer num);

        @Description("Enabling Jms Broker locally")
        @Default.Boolean(true)
        boolean isLocalJmsBrokerEnabled();

        void setLocalJmsBrokerEnabled(boolean z);

        @Description("JMS Read Timeout in seconds")
        @Default.Integer(30)
        Integer getReadTimeout();

        void setReadTimeout(Integer num);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/jms/JmsIOIT$ToString.class */
    public static class ToString extends DoFn<Long, String> {
        ToString() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Long l, DoFn.OutputReceiver<String> outputReceiver) {
            outputReceiver.output(String.format("Message %d", l));
        }
    }

    @Parameterized.Parameters(name = "with client class {3}")
    public static Collection<Object[]> connectionFactories() {
        return Collections.singletonList(new Object[]{"vm://localhost", 5672, "jms.sendAcksAsync=false", ActiveMQConnectionFactory.class});
    }

    public JmsIOIT(String str, Integer num, String str2, Class<? extends ConnectionFactory> cls) {
        this.commonJms = new CommonJms(OPTIONS.isLocalJmsBrokerEnabled() ? str : OPTIONS.getJmsBrokerHost(), OPTIONS.isLocalJmsBrokerEnabled() ? num : OPTIONS.getJmsBrokerPort(), str2, cls);
    }

    @Before
    public void setup() throws Exception {
        if (OPTIONS.isLocalJmsBrokerEnabled()) {
            this.commonJms.startBroker();
            this.connectionFactory = this.commonJms.getConnectionFactory();
            this.connectionFactoryClass = this.commonJms.getConnectionFactoryClass();
            OPTIONS.setNumberOfRecords(10000);
        }
    }

    @After
    public void tearDown() throws Exception {
        if (OPTIONS.isLocalJmsBrokerEnabled()) {
            this.commonJms.stopBroker();
            this.connectionFactory = null;
            this.connectionFactoryClass = null;
        }
    }

    @Test
    public void testPublishingThenReadingAll() throws IOException {
        PipelineResult publishingMessages = publishingMessages();
        Assert.assertNotEquals(PipelineResult.State.FAILED, publishingMessages.waitUntilFinish());
        PipelineResult readMessages = readMessages();
        PipelineResult.State waitUntilFinish = readMessages.waitUntilFinish(Duration.standardSeconds(OPTIONS.getReadTimeout().intValue()));
        cancelIfTimeouted(readMessages, waitUntilFinish);
        Assert.assertNotEquals(PipelineResult.State.FAILED, waitUntilFinish);
        long counterMetric = new MetricsReader(readMessages, NAMESPACE).getCounterMetric(READ_ELEMENT_METRIC_NAME);
        Assert.assertTrue(String.format("actual number of records %d smaller than expected: %d.", Long.valueOf(counterMetric), OPTIONS.getNumberOfRecords()), ((long) OPTIONS.getNumberOfRecords().intValue()) <= counterMetric);
        collectAndPublishMetrics(publishingMessages, readMessages);
    }

    private void cancelIfTimeouted(PipelineResult pipelineResult, PipelineResult.State state) throws IOException {
        if (state == null) {
            pipelineResult.cancel();
        }
    }

    private PipelineResult readMessages() {
        this.pipelineRead.getOptions().as(JmsIOITOptions.class).setStreaming(true);
        this.pipelineRead.getOptions().as(JmsIOITOptions.class).setBlockOnRun(false);
        this.pipelineRead.apply("Read Messages", JmsIO.readMessage().withQueue("test_queue").withUsername("test_user").withPassword("test_password").withCoder(SerializableCoder.of(String.class)).withConnectionFactory(this.connectionFactory).withMessageMapper(getJmsMessageMapper())).apply(ParDo.of(new TimeMonitor(NAMESPACE, READ_TIME_METRIC))).apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
        return this.pipelineRead.run();
    }

    private PipelineResult publishingMessages() {
        this.pipelineWrite.apply("Generate Sequence Data", GenerateSequence.from(0L).to(OPTIONS.getNumberOfRecords().intValue())).apply("Convert to String", ParDo.of(new ToString())).apply("Collect write time", ParDo.of(new TimeMonitor(NAMESPACE, WRITE_TIME_METRIC))).apply("Publish to Jms Broker", JmsIO.write().withQueue("test_queue").withUsername("test_user").withPassword("test_password").withValueMapper(new TextMessageMapper()).withConnectionFactory(this.connectionFactory));
        return this.pipelineWrite.run();
    }

    private void collectAndPublishMetrics(PipelineResult pipelineResult, PipelineResult pipelineResult2) {
        String uuid = UUID.randomUUID().toString();
        String instant = Instant.now().toString();
        Set<Function<MetricsReader, NamedTestResult>> metricsSuppliers = getMetricsSuppliers(uuid, instant, READ_TIME_METRIC);
        Set<Function<MetricsReader, NamedTestResult>> metricsSuppliers2 = getMetricsSuppliers(uuid, instant, WRITE_TIME_METRIC);
        IOITMetrics iOITMetrics = new IOITMetrics(metricsSuppliers, pipelineResult2, NAMESPACE, uuid, instant);
        IOITMetrics iOITMetrics2 = new IOITMetrics(metricsSuppliers2, pipelineResult, NAMESPACE, uuid, instant);
        iOITMetrics.publishToInflux(settings);
        iOITMetrics2.publishToInflux(settings);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getMetricsSuppliers(String str, String str2, String str3) {
        HashSet hashSet = new HashSet();
        hashSet.add(getTimeMetric(str, str2, str3));
        return hashSet;
    }

    private Function<MetricsReader, NamedTestResult> getTimeMetric(String str, String str2, String str3) {
        return metricsReader -> {
            return NamedTestResult.create(str, str2, str3, (metricsReader.getEndTimeMetric(str3) - metricsReader.getStartTimeMetric(str3)) / 1000.0d);
        };
    }

    private JmsIO.MessageMapper<String> getJmsMessageMapper() {
        return message -> {
            return this.connectionFactoryClass == JmsConnectionFactory.class ? ((TextMessage) message).getText() : ((ActiveMQTextMessage) message).getText();
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -135244292:
                if (implMethodName.equals("lambda$getJmsMessageMapper$203ece9c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/jms/JmsIO$MessageMapper") && serializedLambda.getFunctionalInterfaceMethodName().equals("mapMessage") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljavax/jms/Message;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jms/JmsIOIT") && serializedLambda.getImplMethodSignature().equals("(Ljavax/jms/Message;)Ljava/lang/String;")) {
                    JmsIOIT jmsIOIT = (JmsIOIT) serializedLambda.getCapturedArg(0);
                    return message -> {
                        return this.connectionFactoryClass == JmsConnectionFactory.class ? ((TextMessage) message).getText() : ((ActiveMQTextMessage) message).getText();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
