package org.apache.beam.runners.spark.structuredstreaming.aggregators.metrics.sink;

import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
import org.apache.beam.runners.spark.structuredstreaming.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
@Ignore("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.class */
public class SparkMetricsSinkTest {

    @Rule
    public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
    private static final ImmutableList<String> WORDS = ImmutableList.of("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi");
    private static final ImmutableSet<String> EXPECTED_COUNTS = ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
    private static Pipeline pipeline;

    @BeforeClass
    public static void beforeClass() {
        SparkStructuredStreamingPipelineOptions as = PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
        as.setRunner(SparkStructuredStreamingRunner.class);
        as.setTestMode(true);
        pipeline = Pipeline.create(as);
    }

    @Test
    public void testInBatchMode() throws Exception {
        MatcherAssert.assertThat(InMemoryMetrics.valueOf("emptyLines"), Matchers.is(Matchers.nullValue()));
        PAssert.that(pipeline.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())).apply(new WordCount.CountWords()).apply(MapElements.via(new WordCount.FormatAsTextFn()))).containsInAnyOrder(EXPECTED_COUNTS);
        pipeline.run();
        MatcherAssert.assertThat((Double) InMemoryMetrics.valueOf("emptyLines"), Matchers.is(Double.valueOf(1.0d)));
    }
}
