package org.apache.beam.runners.spark.aggregators.metrics.sink;

import org.apache.beam.runners.spark.SparkContextRule;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExternalResource;

/* loaded from: input_file:org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.class */
public class SparkMetricsSinkTest {

    @Rule
    public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();

    @Rule
    public final TestPipeline pipeline = TestPipeline.fromOptions(contextRule.createPipelineOptions());

    @ClassRule
    public static SparkContextRule contextRule = new SparkContextRule(KV.of("spark.metrics.conf.*.sink.memory.class", InMemoryMetrics.class.getName()));
    private static final ImmutableList<String> WORDS = ImmutableList.of("hi there", "hi", "hi sue bob", "hi sue", "", "bob hi");
    private static final ImmutableSet<String> EXPECTED_COUNTS = ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");

    @Test
    public void testInBatchMode() throws Exception {
        MatcherAssert.assertThat(InMemoryMetrics.valueOf("emptyLines"), Matchers.is(Matchers.nullValue()));
        PAssert.that(this.pipeline.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())).apply(new WordCount.CountWords()).apply(MapElements.via(new WordCount.FormatAsTextFn()))).containsInAnyOrder(EXPECTED_COUNTS);
        this.pipeline.run().waitUntilFinish();
        MatcherAssert.assertThat((Double) InMemoryMetrics.valueOf("emptyLines"), Matchers.is(Double.valueOf(1.0d)));
    }

    @Test
    @Category({StreamingTest.class})
    public void testInStreamingMode() throws Exception {
        this.pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true);
        MatcherAssert.assertThat(InMemoryMetrics.valueOf("emptyLines"), Matchers.is(Matchers.nullValue()));
        Instant instant = new Instant(0L);
        PAssert.that(this.pipeline.apply(CreateStream.of(StringUtf8Coder.of(), Duration.millis(this.pipeline.getOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis().longValue())).emptyBatch().advanceWatermarkForNextBatch(instant).nextBatch(new TimestampedValue[]{TimestampedValue.of((String) WORDS.get(0), instant), TimestampedValue.of((String) WORDS.get(1), instant), TimestampedValue.of((String) WORDS.get(2), instant)}).advanceWatermarkForNextBatch(instant.plus(Duration.standardSeconds(2L))).nextBatch(new TimestampedValue[]{TimestampedValue.of((String) WORDS.get(3), instant.plus(Duration.standardSeconds(1L))), TimestampedValue.of((String) WORDS.get(4), instant.plus(Duration.standardSeconds(1L))), TimestampedValue.of((String) WORDS.get(5), instant.plus(Duration.standardSeconds(1L)))}).advanceNextBatchWatermarkToInfinity()).apply(Window.into(FixedWindows.of(Duration.standardSeconds(3L))).withAllowedLateness(Duration.ZERO)).apply(new WordCount.CountWords()).apply(MapElements.via(new WordCount.FormatAsTextFn()))).containsInAnyOrder(EXPECTED_COUNTS);
        this.pipeline.run();
        MatcherAssert.assertThat((Double) InMemoryMetrics.valueOf("emptyLines"), Matchers.is(Double.valueOf(1.0d)));
    }
}
