package org.apache.beam.runners.spark.translation.streaming;

import java.util.List;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkContextRule;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.class */
public class TrackStreamingSourcesTest {

    @ClassRule
    public static SparkContextRule sparkContext = new SparkContextRule(new KV[0]);

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest$PassthroughFn.class */
    private static class PassthroughFn<T> extends DoFn<T, T> {
        private PassthroughFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, T>.ProcessContext processContext) {
            processContext.output(processContext.element());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest$StreamingSourceTracker.class */
    private static class StreamingSourceTracker extends Pipeline.PipelineVisitor.Defaults {
        private final EvaluationContext ctxt;
        private final SparkRunner.Evaluator evaluator;
        private final Class<? extends PTransform> transformClassToAssert;
        private final Integer[] expected;
        private static int numAssertions = 0;

        private StreamingSourceTracker(JavaStreamingContext javaStreamingContext, Pipeline pipeline, Class<? extends PTransform> cls, Integer... numArr) {
            this.ctxt = new EvaluationContext(javaStreamingContext.sparkContext(), pipeline, pipeline.getOptions(), javaStreamingContext);
            this.evaluator = new SparkRunner.Evaluator(new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()), this.ctxt);
            this.transformClassToAssert = cls;
            this.expected = numArr;
        }

        private void assertSourceIds(List<Integer> list) {
            numAssertions++;
            MatcherAssert.assertThat(list, Matchers.containsInAnyOrder(this.expected));
        }

        public void enterPipeline(Pipeline pipeline) {
            super.enterPipeline(pipeline);
            this.evaluator.enterPipeline(pipeline);
        }

        public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
            return this.evaluator.enterCompositeTransform(node);
        }

        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
            PTransform transform = node.getTransform();
            if (transform.getClass() != this.transformClassToAssert) {
                this.evaluator.visitPrimitiveTransform(node);
                return;
            }
            this.ctxt.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
            assertSourceIds(this.ctxt.borrowDataset(transform).getStreamSources());
            this.ctxt.setCurrentTransform((AppliedPTransform) null);
        }

        public void leavePipeline(Pipeline pipeline) {
            super.leavePipeline(pipeline);
            this.evaluator.leavePipeline(pipeline);
        }
    }

    @Before
    public void before() {
        int unused = StreamingSourceTracker.numAssertions = 0;
    }

    @Test
    public void testTrackSingle() {
        SparkContextOptions createPipelineOptions = sparkContext.createPipelineOptions();
        createPipelineOptions.setRunner(SparkRunner.class);
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext.getSparkContext(), new Duration(createPipelineOptions.getBatchIntervalMillis().longValue()));
        Pipeline create = Pipeline.create(createPipelineOptions);
        create.apply(CreateStream.of(VarIntCoder.of(), org.joda.time.Duration.millis(createPipelineOptions.getBatchIntervalMillis().longValue())).emptyBatch()).apply(ParDo.of(new PassthroughFn()));
        create.traverseTopologically(new StreamingSourceTracker(javaStreamingContext, create, ParDo.MultiOutput.class, new Integer[]{0}));
        MatcherAssert.assertThat(Integer.valueOf(StreamingSourceTracker.numAssertions), IsEqual.equalTo(1));
    }

    @Test
    public void testTrackFlattened() {
        SparkContextOptions createPipelineOptions = sparkContext.createPipelineOptions();
        createPipelineOptions.setRunner(SparkRunner.class);
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext.getSparkContext(), new Duration(createPipelineOptions.getBatchIntervalMillis().longValue()));
        Pipeline create = Pipeline.create(createPipelineOptions);
        CreateStream emptyBatch = CreateStream.of(VarIntCoder.of(), org.joda.time.Duration.millis(createPipelineOptions.getBatchIntervalMillis().longValue())).emptyBatch();
        CreateStream emptyBatch2 = CreateStream.of(VarIntCoder.of(), org.joda.time.Duration.millis(createPipelineOptions.getBatchIntervalMillis().longValue())).emptyBatch();
        PCollectionList.of(create.apply(emptyBatch)).and(create.apply(emptyBatch2)).apply(Flatten.pCollections()).apply(ParDo.of(new PassthroughFn()));
        create.traverseTopologically(new StreamingSourceTracker(javaStreamingContext, create, ParDo.MultiOutput.class, new Integer[]{0, 1}));
        MatcherAssert.assertThat(Integer.valueOf(StreamingSourceTracker.numAssertions), IsEqual.equalTo(1));
    }
}
