package org.apache.beam.sdk.io.solace.it;

import com.solacesystems.jcsmp.DeliveryMode;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory;
import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.apache.beam.sdk.io.solace.data.SolaceDataUtils;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runners.MethodSorters;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
/* loaded from: input_file:org/apache/beam/sdk/io/solace/it/SolaceIOIT.class */
public class SolaceIOIT {
    private static final String READ_COUNT = "read_count";
    private static final String WRITE_COUNT = "write_count";
    private static SolaceContainerManager solaceContainerManager;
    private static final String queueName = "test_queue";
    private static final long PUBLISH_MESSAGE_COUNT = 20;

    @Rule
    public final TestPipeline pipeline = TestPipeline.fromOptions(pipelineOptions);
    private static final String NAMESPACE = SolaceIOIT.class.getName();
    private static final TestPipelineOptions pipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/solace/it/SolaceIOIT$CountingFn.class */
    public static class CountingFn<T> extends DoFn<T, T> {
        private final Counter elementCounter;

        CountingFn(String str, String str2) {
            this.elementCounter = Metrics.counter(str, str2);
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element T t, DoFn.OutputReceiver<T> outputReceiver) {
            this.elementCounter.inc(1L);
            outputReceiver.output(t);
        }
    }

    @BeforeClass
    public static void setup() throws IOException {
        solaceContainerManager = new SolaceContainerManager();
        solaceContainerManager.start();
        solaceContainerManager.createQueueWithSubscriptionTopic(queueName);
    }

    @AfterClass
    public static void afterClass() {
        if (solaceContainerManager != null) {
            solaceContainerManager.stop();
        }
    }

    @Test
    public void test01WriteStreaming() {
        testWriteConnector(SolaceIO.WriterType.STREAMING);
    }

    @Test
    public void test02Read() {
        this.pipeline.apply("Read from Solace", SolaceIO.read().from(Solace.Queue.fromName(queueName)).withDeduplicateRecords(true).withMaxNumConnections(1).withSempClientFactory(BasicAuthSempClientFactory.builder().host("http://localhost:" + solaceContainerManager.sempPortMapped).username("admin").password("admin").vpnName(SolaceContainerManager.VPN_NAME).build()).withSessionServiceFactory(BasicAuthJcsmpSessionServiceFactory.builder().host("localhost:" + solaceContainerManager.jcsmpPortMapped).username(SolaceContainerManager.USERNAME).password(SolaceContainerManager.PASSWORD).vpnName(SolaceContainerManager.VPN_NAME).build())).apply("Count", ParDo.of(new CountingFn(NAMESPACE, READ_COUNT)));
        PipelineResult run = this.pipeline.run();
        run.waitUntilFinish(Duration.standardSeconds(15L));
        Assert.assertEquals(PUBLISH_MESSAGE_COUNT, new MetricsReader(run, NAMESPACE).getCounterMetric(READ_COUNT));
    }

    @Test
    public void test03WriteBatched() {
        testWriteConnector(SolaceIO.WriterType.BATCHED);
    }

    private void testWriteConnector(SolaceIO.WriterType writerType) {
        PipelineResult run = createWriterPipeline(writerType).run();
        run.waitUntilFinish();
        Assert.assertEquals(PUBLISH_MESSAGE_COUNT, new MetricsReader(run, NAMESPACE).getCounterMetric(WRITE_COUNT));
    }

    private Pipeline createWriterPipeline(SolaceIO.WriterType writerType) {
        TestStream.Builder advanceWatermarkTo = TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class))).advanceWatermarkTo(Instant.EPOCH);
        for (int i = 0; i < PUBLISH_MESSAGE_COUNT; i++) {
            advanceWatermarkTo = advanceWatermarkTo.addElements(KV.of("Solace-Message-ID:m" + i, String.format("{\"field_str\":\"value\",\"field_int\":123%d}", Integer.valueOf(i))), new KV[0]).advanceProcessingTime(Duration.standardSeconds(60L));
        }
        this.pipeline.apply(String.format("Test stream %s", writerType), advanceWatermarkTo.advanceWatermarkToInfinity()).apply(String.format("To Record %s", writerType), MapElements.into(TypeDescriptor.of(Solace.Record.class)).via(kv -> {
            return SolaceDataUtils.getSolaceRecord((String) kv.getValue(), (String) kv.getKey());
        })).apply(String.format("Write to Solace %s", writerType), SolaceIO.write().to(Solace.Topic.fromName(SolaceContainerManager.TOPIC_NAME)).withSubmissionMode(SolaceIO.SubmissionMode.TESTING).withWriterType(writerType).withDeliveryMode(DeliveryMode.PERSISTENT).withNumberOfClientsPerWorker(1).withNumShards(1).withSessionServiceFactory(BasicAuthJcsmpSessionServiceFactory.builder().host("localhost:" + solaceContainerManager.jcsmpPortMapped).username(SolaceContainerManager.USERNAME).password(SolaceContainerManager.PASSWORD).vpnName(SolaceContainerManager.VPN_NAME).build())).getSuccessfulPublish().apply(String.format("Get ids %s", writerType), MapElements.into(TypeDescriptors.strings()).via((v0) -> {
            return v0.getMessageId();
        })).apply(String.format("Count %s", writerType), ParDo.of(new CountingFn(NAMESPACE, WRITE_COUNT)));
        return this.pipeline;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1449618228:
                if (implMethodName.equals("getMessageId")) {
                    z = false;
                    break;
                }
                break;
            case 1666819397:
                if (implMethodName.equals("lambda$createWriterPipeline$c2ef20b6$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/solace/data/Solace$PublishResult") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getMessageId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/solace/it/SolaceIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/KV;)Lorg/apache/beam/sdk/io/solace/data/Solace$Record;")) {
                    return kv -> {
                        return SolaceDataUtils.getSolaceRecord((String) kv.getValue(), (String) kv.getKey());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        pipelineOptions.as(StreamingOptions.class).setStreaming(true);
        pipelineOptions.setBlockOnRun(false);
        pipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
    }
}
