package org.apache.beam.sdk.io.sparkreceiver;

import com.google.cloud.Timestamp;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.class */
public class SparkReceiverIOIT {
    private static final String READ_TIME_METRIC_NAME = "read_time";
    private static final String RUN_TIME_METRIC_NAME = "run_time";
    private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
    private static final String TEST_MESSAGE_PREFIX = "Test ";
    private static Options options;
    private static SyntheticSourceOptions sourceOptions;
    private static GenericContainer<?> rabbitMqContainer;
    private static InfluxDBSettings settings;

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
    private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
    private static final String TEST_ID = UUID.randomUUID().toString();
    private static final String TIMESTAMP = Timestamp.now().toString();

    /* loaded from: input_file:org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT$CountingFn.class */
    private static class CountingFn extends DoFn<String, Void> {
        private final Counter elementCounter;

        CountingFn(String str, String str2) {
            this.elementCounter = Metrics.counter(str, str2);
        }

        @DoFn.ProcessElement
        public void processElement() {
            this.elementCounter.inc(1L);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT$Options.class */
    public interface Options extends IOTestPipelineOptions, StreamingOptions {
        @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
        @Description("Options for synthetic source.")
        @Validation.Required
        String getSourceOptions();

        void setSourceOptions(String str);

        @Default.String("amqp://guest:guest@localhost:5672")
        @Description("RabbitMQ bootstrap server address")
        String getRabbitMqBootstrapServerAddress();

        void setRabbitMqBootstrapServerAddress(String str);

        @Default.String("rabbitMqTestStream")
        @Description("RabbitMQ stream")
        String getStreamName();

        void setStreamName(String str);

        @Description("Whether to use testcontainers")
        @Default.Boolean(false)
        Boolean isWithTestcontainers();

        void setWithTestcontainers(Boolean bool);

        @Default.String("3.9-alpine")
        @Description("RabbitMQ container version. Use when useTestcontainers is true")
        String getRabbitMqContainerVersion();

        void setRabbitMqContainerVersion(String str);

        @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
        @Default.Integer(50)
        @Validation.Required
        Integer getReadTimeout();

        void setReadTimeout(Integer num);
    }

    @BeforeClass
    public static void setup() throws IOException {
        options = IOITHelper.readIOTestPipelineOptions(Options.class);
        sourceOptions = SyntheticOptions.fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
        if (options.isWithTestcontainers().booleanValue()) {
            setupRabbitMqContainer();
        } else {
            settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
        }
        clearRabbitMQ();
    }

    @AfterClass
    public static void afterClass() {
        if (rabbitMqContainer != null) {
            rabbitMqContainer.stop();
        }
        clearRabbitMQ();
    }

    private static void setupRabbitMqContainer() {
        rabbitMqContainer = new RabbitMQContainer(DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion())).withExposedPorts(new Integer[]{5672, 15672});
        rabbitMqContainer.start();
        options.setRabbitMqBootstrapServerAddress(getBootstrapServers(rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
    }

    private static String getBootstrapServers(String str, String str2) {
        return String.format("amqp://guest:guest@%s:%s", str, str2);
    }

    private void writeToRabbitMq(List<String> list) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
        HashMap hashMap = new HashMap();
        hashMap.put("x-queue-type", "stream");
        Connection newConnection = connectionFactory.newConnection();
        try {
            Channel createChannel = newConnection.createChannel();
            Throwable th = null;
            try {
                try {
                    createChannel.queueDeclare(options.getStreamName(), true, false, false, hashMap);
                    list.forEach(str -> {
                        try {
                            createChannel.basicPublish("", options.getStreamName(), MessageProperties.PERSISTENT_TEXT_PLAIN, str.getBytes(StandardCharsets.UTF_8));
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    if (createChannel != null) {
                        $closeResource(null, createChannel);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (createChannel != null) {
                    $closeResource(th, createChannel);
                }
                throw th3;
            }
        } finally {
            if (newConnection != null) {
                $closeResource(null, newConnection);
            }
        }
    }

    private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
        return SparkReceiverIO.read().withGetOffsetFn(str -> {
            return Long.valueOf(str.substring(TEST_MESSAGE_PREFIX.length()));
        }).withSparkReceiverBuilder(new ReceiverBuilder(RabbitMqReceiverWithOffset.class).withConstructorArgs(new Object[]{options.getRabbitMqBootstrapServerAddress(), options.getStreamName(), Long.valueOf(sourceOptions.numRecords)}));
    }

    /* JADX WARN: Finally extract failed */
    private static void clearRabbitMQ() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        try {
            connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
            Connection newConnection = connectionFactory.newConnection();
            try {
                Channel createChannel = newConnection.createChannel();
                Throwable th = null;
                try {
                    try {
                        createChannel.queueDelete(options.getStreamName());
                        if (createChannel != null) {
                            $closeResource(null, createChannel);
                        }
                        if (newConnection != null) {
                            $closeResource(null, newConnection);
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (createChannel != null) {
                        $closeResource(th, createChannel);
                    }
                    throw th3;
                }
            } catch (Throwable th4) {
                if (newConnection != null) {
                    $closeResource(null, newConnection);
                }
                throw th4;
            }
        } catch (IOException | URISyntaxException | KeyManagementException | NoSuchAlgorithmException | TimeoutException e) {
            LOG.error("Error during RabbitMQ clean up", e);
        }
    }

    private void cancelIfTimeout(PipelineResult pipelineResult, PipelineResult.State state) throws IOException {
        if (state == null) {
            pipelineResult.cancel();
        }
    }

    private long readElementMetric(PipelineResult pipelineResult) {
        return new MetricsReader(pipelineResult, NAMESPACE).getCounterMetric(READ_ELEMENT_METRIC_NAME);
    }

    private Set<NamedTestResult> readMetrics(PipelineResult pipelineResult) {
        BiFunction biFunction = (metricsReader, str) -> {
            long startTimeMetric = metricsReader.getStartTimeMetric(str);
            return NamedTestResult.create(TEST_ID, TIMESTAMP, str, (metricsReader.getEndTimeMetric(str) - startTimeMetric) / 1000.0d);
        };
        NamedTestResult namedTestResult = (NamedTestResult) biFunction.apply(new MetricsReader(pipelineResult, NAMESPACE), READ_TIME_METRIC_NAME);
        return ImmutableSet.of(namedTestResult, NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, namedTestResult.getValue()));
    }

    @Test
    public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
        try {
            writeToRabbitMq((List) LongStream.range(0L, sourceOptions.numRecords).mapToObj(j -> {
                return TEST_MESSAGE_PREFIX + j;
            }).collect(Collectors.toList()));
        } catch (Exception e) {
            LOG.error("Can not write to rabbit {}", e.getMessage());
            Assert.fail();
        }
        LOG.info(sourceOptions.numRecords + " records were successfully written to RabbitMQ");
        this.readPipeline.getOptions().as(Options.class).setStreaming(true);
        this.readPipeline.apply("Read from unbounded RabbitMq", readFromRabbitMqWithOffset()).setCoder(StringUtf8Coder.of()).apply("Measure read time", ParDo.of(new TimeMonitor(NAMESPACE, READ_TIME_METRIC_NAME))).apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
        PipelineResult run = this.readPipeline.run();
        cancelIfTimeout(run, run.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout().intValue())));
        Assert.assertEquals(sourceOptions.numRecords, readElementMetric(run));
        if (options.isWithTestcontainers().booleanValue()) {
            return;
        }
        IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, readMetrics(run), settings);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 714058506:
                if (implMethodName.equals("lambda$readFromRabbitMqWithOffset$28d11a0$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Long;")) {
                    return str -> {
                        return Long.valueOf(str.substring(TEST_MESSAGE_PREFIX.length()));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
