package org.apache.beam.sdk.io.solace.it;

import java.io.IOException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory;
import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/io/solace/it/SolaceIOIT.class */
public class SolaceIOIT {
    private static final String READ_COUNT = "read_count";
    private static SolaceContainerManager solaceContainerManager;

    @Rule
    public final TestPipeline readPipeline = TestPipeline.fromOptions(readPipelineOptions);
    private static final String NAMESPACE = SolaceIOIT.class.getName();
    private static final TestPipelineOptions readPipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class);

    /* loaded from: input_file:org/apache/beam/sdk/io/solace/it/SolaceIOIT$CountingFn.class */
    private static class CountingFn<T> extends DoFn<T, T> {
        private final Counter elementCounter;

        CountingFn(String str, String str2) {
            this.elementCounter = Metrics.counter(str, str2);
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element T t, DoFn.OutputReceiver<T> outputReceiver) {
            this.elementCounter.inc(1L);
            outputReceiver.output(t);
        }
    }

    @BeforeClass
    public static void setup() throws IOException {
        solaceContainerManager = new SolaceContainerManager();
        solaceContainerManager.start();
    }

    @AfterClass
    public static void afterClass() {
        if (solaceContainerManager != null) {
            solaceContainerManager.stop();
        }
    }

    @Test
    public void testRead() {
        solaceContainerManager.createQueueWithSubscriptionTopic("test_queue");
        for (int i = 0; i < 20; i++) {
            solaceContainerManager.sendToTopic("{\"field_str\":\"value\",\"field_int\":123}", ImmutableList.of("Solace-Message-ID:m" + i));
        }
        this.readPipeline.apply("Read from Solace", SolaceIO.read().from(Solace.Queue.fromName("test_queue")).withMaxNumConnections(1).withSempClientFactory(BasicAuthSempClientFactory.builder().host("http://localhost:" + solaceContainerManager.sempPortMapped).username("admin").password("admin").vpnName(SolaceContainerManager.VPN_NAME).build()).withSessionServiceFactory(BasicAuthJcsmpSessionServiceFactory.builder().host("localhost:" + solaceContainerManager.jcsmpPortMapped).username(SolaceContainerManager.USERNAME).password(SolaceContainerManager.PASSWORD).vpnName(SolaceContainerManager.VPN_NAME).build())).apply("Count", ParDo.of(new CountingFn(NAMESPACE, READ_COUNT)));
        PipelineResult run = this.readPipeline.run();
        run.waitUntilFinish(Duration.standardSeconds(15L));
        Assert.assertEquals(20, new MetricsReader(run, NAMESPACE).getCounterMetric(READ_COUNT));
    }

    static {
        readPipelineOptions.setBlockOnRun(false);
        readPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
        readPipelineOptions.as(StreamingOptions.class).setStreaming(false);
    }
}
