package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.class */
public class ParDoTest implements Serializable {
    private static Pipeline pipeline;
    private static final DoFn<Integer, Integer> PLUS_ONE_DOFN = new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.4
        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
            processContext.output(Integer.valueOf(((Integer) processContext.element()).intValue() + 1));
        }
    };

    @BeforeClass
    public static void beforeClass() {
        SparkStructuredStreamingPipelineOptions as = PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
        as.setRunner(SparkStructuredStreamingRunner.class);
        as.setTestMode(true);
        pipeline = Pipeline.create(as);
    }

    @Test
    public void testPardo() {
        PAssert.that(pipeline.apply(Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(PLUS_ONE_DOFN))).containsInAnyOrder(new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10, 11});
        pipeline.run();
    }

    @Test
    public void testTwoPardoInRow() {
        PAssert.that(pipeline.apply(Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(PLUS_ONE_DOFN)).apply(ParDo.of(PLUS_ONE_DOFN))).containsInAnyOrder(new Integer[]{3, 4, 5, 6, 7, 8, 9, 10, 11, 12});
        pipeline.run();
    }

    @Test
    public void testSideInputAsList() {
        final PCollectionView apply = pipeline.apply("Create sideInput", Create.of(1, new Integer[]{2, 3})).apply(View.asList());
        PAssert.that(pipeline.apply("Create input", Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                if (((List) processContext.sideInput(apply)).contains(processContext.element())) {
                    return;
                }
                processContext.output((Integer) processContext.element());
            }
        }).withSideInputs(new PCollectionView[]{apply}))).containsInAnyOrder(new Integer[]{4, 5, 6, 7, 8, 9, 10});
        pipeline.run();
    }

    @Test
    public void testSideInputAsSingleton() {
        final PCollectionView apply = pipeline.apply("Create sideInput", Create.of(1, new Integer[0])).apply(View.asSingleton());
        PAssert.that(pipeline.apply("Create input", Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.2
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                if (((Integer) processContext.sideInput(apply)).equals(processContext.element())) {
                    return;
                }
                processContext.output((Integer) processContext.element());
            }
        }).withSideInputs(new PCollectionView[]{apply}))).containsInAnyOrder(new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10});
        pipeline.run();
    }

    @Test
    public void testSideInputAsMap() {
        final PCollectionView apply = pipeline.apply("Create sideInput", Create.of(KV.of("key1", 1), new KV[]{KV.of("key2", 2)})).apply(View.asMap());
        PAssert.that(pipeline.apply("Create input", Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.3
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                if (((Map) processContext.sideInput(apply)).containsKey("key" + processContext.element())) {
                    return;
                }
                processContext.output((Integer) processContext.element());
            }
        }).withSideInputs(new PCollectionView[]{apply}))).containsInAnyOrder(new Integer[]{3, 4, 5, 6, 7, 8, 9, 10});
        pipeline.run();
    }
}
