package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.spark.SparkCommonPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.class */
public class ParDoTest implements Serializable {

    @Rule
    public transient TestPipeline pipeline = TestPipeline.fromOptions(SESSION.createPipelineOptions());

    @Rule
    public transient TestRule clearCache = SESSION.clearCache();

    @ClassRule
    public static final SparkSessionRule SESSION = new SparkSessionRule(new KV[0]);
    private static final DoFn<Integer, Integer> PLUS_ONE_DOFN = new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.7
        @DoFn.ProcessElement
        public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
            processContext.output(Integer.valueOf(((Integer) processContext.element()).intValue() + 1));
        }
    };

    @Test
    public void testPardo() {
        PAssert.that(this.pipeline.apply(Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(PLUS_ONE_DOFN))).containsInAnyOrder(new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10, 11});
        this.pipeline.run();
        Assert.assertTrue("No usage of cache expected", !SESSION.hasCachedData());
    }

    @Test
    public void testPardoWithOutputTagsCachedRDD() {
        pardoWithOutputTags("MEMORY_ONLY", true);
        Assert.assertTrue("Expected cached data", SESSION.hasCachedData());
    }

    @Test
    public void testPardoWithOutputTagsCachedDataset() {
        pardoWithOutputTags("MEMORY_AND_DISK", true);
        Assert.assertTrue("Expected cached data", SESSION.hasCachedData());
    }

    @Test
    public void testPardoWithUnusedOutputTags() {
        pardoWithOutputTags("MEMORY_AND_DISK", false);
        Assert.assertTrue("No usage of cache expected", !SESSION.hasCachedData());
    }

    private void pardoWithOutputTags(String str, boolean z) {
        this.pipeline.getOptions().as(SparkCommonPipelineOptions.class).setStorageLevel(str);
        final TupleTag<Integer> tupleTag = new TupleTag<Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.1
        };
        final TupleTag<String> tupleTag2 = new TupleTag<String>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.2
        };
        PCollectionTuple apply = this.pipeline.apply(Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.3
            @DoFn.ProcessElement
            public void processElement(@DoFn.Element Integer num, DoFn.MultiOutputReceiver multiOutputReceiver) {
                if (num.intValue() % 2 == 0) {
                    multiOutputReceiver.get(tupleTag).output(num);
                } else {
                    multiOutputReceiver.get(tupleTag2).output(num.toString());
                }
            }
        }).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        PAssert.that(apply.get(tupleTag)).containsInAnyOrder(new Integer[]{2, 4, 6, 8, 10});
        if (z) {
            PAssert.that(apply.get(tupleTag2)).containsInAnyOrder(new String[]{"1", "3", "5", "7", "9"});
        }
        this.pipeline.run();
    }

    @Test
    public void testTwoPardoInRow() {
        PAssert.that(this.pipeline.apply(Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply("Plus 1 (1st)", ParDo.of(PLUS_ONE_DOFN)).apply("Plus 1 (2nd)", ParDo.of(PLUS_ONE_DOFN))).containsInAnyOrder(new Integer[]{3, 4, 5, 6, 7, 8, 9, 10, 11, 12});
        this.pipeline.run();
        Assert.assertTrue("No usage of cache expected", !SESSION.hasCachedData());
    }

    @Test
    public void testSideInputAsList() {
        final PCollectionView apply = this.pipeline.apply("Create sideInput", Create.of(1, new Integer[]{2, 3})).apply(View.asList());
        PAssert.that(this.pipeline.apply("Create input", Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.4
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                if (((List) processContext.sideInput(apply)).contains(processContext.element())) {
                    return;
                }
                processContext.output((Integer) processContext.element());
            }
        }).withSideInputs(new PCollectionView[]{apply}))).containsInAnyOrder(new Integer[]{4, 5, 6, 7, 8, 9, 10});
        this.pipeline.run();
        Assert.assertTrue("No usage of cache expected", !SESSION.hasCachedData());
    }

    @Test
    public void testSideInputAsSingleton() {
        final PCollectionView apply = this.pipeline.apply("Create sideInput", Create.of(1, new Integer[0])).apply(View.asSingleton());
        PAssert.that(this.pipeline.apply("Create input", Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.5
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                if (((Integer) processContext.sideInput(apply)).equals(processContext.element())) {
                    return;
                }
                processContext.output((Integer) processContext.element());
            }
        }).withSideInputs(new PCollectionView[]{apply}))).containsInAnyOrder(new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10});
        this.pipeline.run();
        Assert.assertTrue("No usage of cache expected", !SESSION.hasCachedData());
    }

    @Test
    public void testSideInputAsMap() {
        final PCollectionView apply = this.pipeline.apply("Create sideInput", Create.of(KV.of("key1", 1), new KV[]{KV.of("key2", 2)})).apply(View.asMap());
        PAssert.that(this.pipeline.apply("Create input", Create.of(1, new Integer[]{2, 3, 4, 5, 6, 7, 8, 9, 10})).apply(ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.spark.structuredstreaming.translation.batch.ParDoTest.6
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                if (((Map) processContext.sideInput(apply)).containsKey("key" + processContext.element())) {
                    return;
                }
                processContext.output((Integer) processContext.element());
            }
        }).withSideInputs(new PCollectionView[]{apply}))).containsInAnyOrder(new Integer[]{3, 4, 5, 6, 7, 8, 9, 10});
        this.pipeline.run();
        Assert.assertTrue("No usage of cache expected", !SESSION.hasCachedData());
    }
}
