package org.apache.beam.runners.spark.translation;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/WindowedWordCountTest.class */
public class WindowedWordCountTest {
    private static final String[] WORDS_ARRAY = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
    private static final Long[] TIMESTAMPS_ARRAY = {60000L, 60000L, 60000L, 179000L, 179000L, 179000L};
    private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
    private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY);
    private static final List<String> EXPECTED_FIXED_SEPARATE_COUNT_SET = ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 2", "sue: 1", "bob: 1");
    private static final List<String> EXPECTED_FIXED_SAME_COUNT_SET = ImmutableList.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
    private static final List<String> EXPECTED_SLIDING_COUNT_SET = ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 5", "there: 1", "sue: 2", "bob: 2", "hi: 2", "sue: 1", "bob: 1");

    @Test
    public void testFixed() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(SparkRunner.class);
        Pipeline create2 = Pipeline.create(create);
        PAssert.that(create2.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of()).apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L)))).apply(new WordCount.CountWords()).apply(MapElements.via(new WordCount.FormatAsTextFn()))).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
        create2.run().close();
    }

    @Test
    public void testFixed2() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(SparkRunner.class);
        Pipeline create2 = Pipeline.create(create);
        PAssert.that(create2.apply(Create.timestamped(WORDS, TIMESTAMPS).withCoder(StringUtf8Coder.of())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(5L)))).apply(new WordCount.CountWords()).apply(MapElements.via(new WordCount.FormatAsTextFn()))).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
        create2.run().close();
    }

    @Test
    public void testSliding() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(SparkRunner.class);
        Pipeline create2 = Pipeline.create(create);
        PAssert.that(create2.apply(Create.timestamped(WORDS, TIMESTAMPS).withCoder(StringUtf8Coder.of())).apply(Window.into(SlidingWindows.of(Duration.standardMinutes(2L)).every(Duration.standardMinutes(1L)))).apply(new WordCount.CountWords()).apply(MapElements.via(new WordCount.FormatAsTextFn()))).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
        create2.run().close();
    }
}
