package org.apache.beam.runners.spark.translation.streaming;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.joda.time.Duration;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.class */
public class FlattenStreamingTest {
    private static final String[] WORDS_ARRAY_1 = {"one", "two", "three", "four"};
    private static final List<Iterable<String>> WORDS_QUEUE_1 = Collections.singletonList(Arrays.asList(WORDS_ARRAY_1));
    private static final String[] WORDS_ARRAY_2 = {"five", "six", "seven", "eight"};
    private static final List<Iterable<String>> WORDS_QUEUE_2 = Collections.singletonList(Arrays.asList(WORDS_ARRAY_2));
    private static final String[] EXPECTED_UNION = {"one", "two", "three", "four", "five", "six", "seven", "eight"};
    private static final long TEST_TIMEOUT_MSEC = 1000;

    @Test
    public void testRun() throws Exception {
        SparkStreamingPipelineOptions as = PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
        as.setRunner(SparkRunner.class);
        as.setStreaming(true);
        as.setTimeout(Long.valueOf(TEST_TIMEOUT_MSEC));
        Pipeline create = Pipeline.create(as);
        PCollection apply = create.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of()).apply(Window.into(FixedWindows.of(Duration.standardSeconds(1L))));
        PAssertStreaming.assertContents(PCollectionList.of(apply).and(create.apply(CreateStream.fromQueue(WORDS_QUEUE_2)).setCoder(StringUtf8Coder.of()).apply(Window.into(FixedWindows.of(Duration.standardSeconds(1L))))).apply(Flatten.pCollections()), EXPECTED_UNION);
        SparkRunner.create(as).run(create).close();
    }
}
