/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.sparkreceiver;

import com.google.cloud.Timestamp;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.sparkreceiver.RabbitMqReceiverWithOffset;
import org.apache.beam.sdk.io.sparkreceiver.ReceiverBuilder;
import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(value=JUnit4.class)
public class SparkReceiverIOIT {
    private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
    private static final String READ_TIME_METRIC_NAME = "read_time";
    private static final String RUN_TIME_METRIC_NAME = "run_time";
    private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
    private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
    private static final String TEST_ID = UUID.randomUUID().toString();
    private static final String TIMESTAMP = Timestamp.now().toString();
    private static final String TEST_MESSAGE_PREFIX = "Test ";
    private static Options options;
    private static SyntheticSourceOptions sourceOptions;
    private static GenericContainer<?> rabbitMqContainer;
    private static InfluxDBSettings settings;
    @Rule
    public TestPipeline readPipeline = TestPipeline.create();

    @BeforeClass
    public static void setup() throws IOException {
        options = (Options)IOITHelper.readIOTestPipelineOptions(Options.class);
        sourceOptions = (SyntheticSourceOptions)SyntheticOptions.fromJsonString((String)options.getSourceOptions(), SyntheticSourceOptions.class);
        if (options.isWithTestcontainers().booleanValue()) {
            SparkReceiverIOIT.setupRabbitMqContainer();
        } else {
            settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
        }
        SparkReceiverIOIT.clearRabbitMQ();
    }

    @AfterClass
    public static void afterClass() {
        if (rabbitMqContainer != null) {
            rabbitMqContainer.stop();
        }
        SparkReceiverIOIT.clearRabbitMQ();
    }

    private static void setupRabbitMqContainer() {
        rabbitMqContainer = new RabbitMQContainer(DockerImageName.parse((String)"rabbitmq").withTag(options.getRabbitMqContainerVersion())).withExposedPorts(new Integer[]{5672, 15672});
        rabbitMqContainer.start();
        options.setRabbitMqBootstrapServerAddress(SparkReceiverIOIT.getBootstrapServers(rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
    }

    private static String getBootstrapServers(String host, String port) {
        return String.format("amqp://guest:guest@%s:%s", host, port);
    }

    private void writeToRabbitMq(List<String> messages) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
        HashMap<String, String> arguments = new HashMap<String, String>();
        arguments.put("x-queue-type", "stream");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel();){
            channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
            messages.forEach(message -> {
                try {
                    channel.basicPublish("", options.getStreamName(), MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
        ReceiverBuilder receiverBuilder = new ReceiverBuilder(RabbitMqReceiverWithOffset.class).withConstructorArgs(new Object[]{options.getRabbitMqBootstrapServerAddress(), options.getStreamName(), SparkReceiverIOIT.sourceOptions.numRecords});
        return SparkReceiverIO.read().withGetOffsetFn((SerializableFunction & Serializable)rabbitMqMessage -> Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length()))).withSparkReceiverBuilder(receiverBuilder);
    }

    private static void clearRabbitMQ() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        try {
            connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel();){
                channel.queueDelete(options.getStreamName());
            }
        }
        catch (IOException | URISyntaxException | KeyManagementException | NoSuchAlgorithmException | TimeoutException e) {
            LOG.error("Error during RabbitMQ clean up", (Throwable)e);
        }
    }

    private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState) throws IOException {
        if (readState == null) {
            readResult.cancel();
        }
    }

    private long readElementMetric(PipelineResult result) {
        MetricsReader metricsReader = new MetricsReader(result, NAMESPACE);
        return metricsReader.getCounterMetric(READ_ELEMENT_METRIC_NAME);
    }

    private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
        BiFunction<MetricsReader, String, NamedTestResult> supplier = (reader, metricName) -> {
            long start = reader.getStartTimeMetric(metricName);
            long end = reader.getEndTimeMetric(metricName);
            return NamedTestResult.create((String)TEST_ID, (String)TIMESTAMP, (String)metricName, (double)((double)(end - start) / 1000.0));
        };
        NamedTestResult readTime = supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
        NamedTestResult runTime = NamedTestResult.create((String)TEST_ID, (String)TIMESTAMP, (String)RUN_TIME_METRIC_NAME, (double)readTime.getValue());
        return ImmutableSet.of((Object)readTime, (Object)runTime);
    }

    @Test
    public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
        List<String> messages = LongStream.range(0L, SparkReceiverIOIT.sourceOptions.numRecords).mapToObj(number -> TEST_MESSAGE_PREFIX + number).collect(Collectors.toList());
        try {
            this.writeToRabbitMq(messages);
        }
        catch (Exception e) {
            LOG.error("Can not write to rabbit {}", (Object)e.getMessage());
            Assert.fail();
        }
        LOG.info(SparkReceiverIOIT.sourceOptions.numRecords + " records were successfully written to RabbitMQ");
        ((Options)this.readPipeline.getOptions().as(Options.class)).setStreaming(true);
        ((PCollection)((PCollection)this.readPipeline.apply("Read from unbounded RabbitMq", this.readFromRabbitMqWithOffset())).setCoder((Coder)StringUtf8Coder.of()).apply("Measure read time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME)))).apply("Counting element", (PTransform)ParDo.of((DoFn)new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
        PipelineResult readResult = this.readPipeline.run();
        PipelineResult.State readState = readResult.waitUntilFinish(Duration.standardSeconds((long)options.getReadTimeout().intValue()));
        this.cancelIfTimeout(readResult, readState);
        Assert.assertEquals((long)SparkReceiverIOIT.sourceOptions.numRecords, (long)this.readElementMetric(readResult));
        if (!options.isWithTestcontainers().booleanValue()) {
            Set<NamedTestResult> metrics = this.readMetrics(readResult);
            IOITMetrics.publishToInflux((String)TEST_ID, (String)TIMESTAMP, metrics, (InfluxDBSettings)settings);
        }
    }

    private static class CountingFn
    extends DoFn<String, Void> {
        private final Counter elementCounter;

        CountingFn(String namespace, String name) {
            this.elementCounter = Metrics.counter((String)namespace, (String)name);
        }

        @DoFn.ProcessElement
        public void processElement() {
            this.elementCounter.inc(1L);
        }
    }

    public static interface Options
    extends IOTestPipelineOptions,
    StreamingOptions {
        @Description(value="Options for synthetic source.")
        @Validation.Required
        @Default.String(value="{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
        public String getSourceOptions();

        public void setSourceOptions(String var1);

        @Description(value="RabbitMQ bootstrap server address")
        @Default.String(value="amqp://guest:guest@localhost:5672")
        public String getRabbitMqBootstrapServerAddress();

        public void setRabbitMqBootstrapServerAddress(String var1);

        @Description(value="RabbitMQ stream")
        @Default.String(value="rabbitMqTestStream")
        public String getStreamName();

        public void setStreamName(String var1);

        @Description(value="Whether to use testcontainers")
        @Default.Boolean(value=false)
        public Boolean isWithTestcontainers();

        public void setWithTestcontainers(Boolean var1);

        @Description(value="RabbitMQ container version. Use when useTestcontainers is true")
        @Default.String(value="3.9-alpine")
        public @Nullable String getRabbitMqContainerVersion();

        public void setRabbitMqContainerVersion(String var1);

        @Description(value="Time to wait for the events to be processed by the read pipeline (in seconds)")
        @Default.Integer(value=50)
        @Validation.Required
        public Integer getReadTimeout();

        public void setReadTimeout(Integer var1);
    }
}

