package org.apache.beam.runners.core.construction.graph;

import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.collect.ImmutableSet;
import org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.class */
public class QueryablePipelineTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/runners/core/construction/graph/QueryablePipelineTest$TestFn.class */
    private static class TestFn extends DoFn<Long, Long> {
        private TestFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<Long, Long>.ProcessContext processContext) {
        }
    }

    @Test
    public void fromEmptyComponents() {
        Assert.assertThat(QueryablePipeline.forPrimitivesIn(RunnerApi.Components.getDefaultInstance()).getRootTransforms(), Matchers.emptyIterable());
    }

    @Test
    public void fromComponentsWithMalformedComponents() {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putTransforms("root", RunnerApi.PTransform.newBuilder().putOutputs("output", "output.out").build()).build();
        this.thrown.expect(IllegalArgumentException.class);
        QueryablePipeline.forPrimitivesIn(build);
    }

    @Test
    public void forTransformsWithMalformedGraph() {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putTransforms("root", RunnerApi.PTransform.newBuilder().putOutputs("output", "output.out").build()).putPcollections("output.out", RunnerApi.PCollection.newBuilder().setUniqueName("output.out").build()).putTransforms("consumer", RunnerApi.PTransform.newBuilder().putInputs("input", "output.out").build()).build();
        this.thrown.expect(IllegalArgumentException.class);
        QueryablePipeline.forTransforms(ImmutableSet.of("consumer"), build);
    }

    @Test
    public void forTransformsWithSubgraph() {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putTransforms("root", RunnerApi.PTransform.newBuilder().putOutputs("output", "output.out").build()).putPcollections("output.out", RunnerApi.PCollection.newBuilder().setUniqueName("output.out").build()).putTransforms("consumer", RunnerApi.PTransform.newBuilder().putInputs("input", "output.out").build()).putTransforms("ignored", RunnerApi.PTransform.newBuilder().putInputs("input", "output.out").build()).build();
        QueryablePipeline forTransforms = QueryablePipeline.forTransforms(ImmutableSet.of("root", "consumer"), build);
        Assert.assertThat(forTransforms.getRootTransforms(), Matchers.contains(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("root", build.getTransformsOrThrow("root"))}));
        Assert.assertThat(forTransforms.getPerElementConsumers(PipelineNode.pCollection("output.out", build.getPcollectionsOrThrow("output.out"))), Matchers.contains(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("consumer", build.getTransformsOrThrow("consumer"))}));
    }

    @Test
    public void rootTransforms() {
        Pipeline create = Pipeline.create();
        create.apply("UnboundedRead", Read.from(CountingSource.unbounded())).apply(Window.into(FixedWindows.of(Duration.millis(5L)))).apply(Count.perElement());
        create.apply("BoundedRead", Read.from(CountingSource.upTo(100L)));
        QueryablePipeline forPrimitivesIn = QueryablePipeline.forPrimitivesIn(PipelineTranslation.toProto(create).getComponents());
        Assert.assertThat(forPrimitivesIn.getRootTransforms(), Matchers.hasSize(2));
        for (PipelineNode.PTransformNode pTransformNode : forPrimitivesIn.getRootTransforms()) {
            Assert.assertThat("Root transforms should have no inputs", Integer.valueOf(pTransformNode.getTransform().getInputsCount()), Matchers.equalTo(0));
            Assert.assertThat("Only added source reads to the pipeline", pTransformNode.getTransform().getSpec().getUrn(), Matchers.equalTo(PTransformTranslation.READ_TRANSFORM_URN));
        }
    }

    @Test
    public void transformWithSideAndMainInputs() {
        Pipeline create = Pipeline.create();
        create.apply("BoundedRead", Read.from(CountingSource.upTo(100L))).apply("par_do", ParDo.of(new TestFn()).withSideInputs(new PCollectionView[]{create.apply("Create", Create.of("foo", new String[0])).apply("View", View.asSingleton())}).withOutputTags(new TupleTag(), TupleTagList.empty()));
        RunnerApi.Components components = PipelineTranslation.toProto(create).getComponents();
        QueryablePipeline forPrimitivesIn = QueryablePipeline.forPrimitivesIn(components);
        String str = (String) Iterables.getOnlyElement(PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead")).getTransform().getOutputsMap().values());
        PipelineNode.PCollectionNode pCollection = PipelineNode.pCollection(str, components.getPcollectionsOrThrow(str));
        RunnerApi.PTransform transformsOrThrow = components.getTransformsOrThrow("par_do");
        String str2 = (String) Iterables.getOnlyElement((Iterable) transformsOrThrow.getInputsMap().entrySet().stream().filter(entry -> {
            return !((String) entry.getValue()).equals(str);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet()));
        String inputsOrThrow = transformsOrThrow.getInputsOrThrow(str2);
        PipelineNode.PCollectionNode pCollection2 = PipelineNode.pCollection(inputsOrThrow, components.getPcollectionsOrThrow(inputsOrThrow));
        PipelineNode.PTransformNode pTransform = PipelineNode.pTransform("par_do", components.getTransformsOrThrow("par_do"));
        Assert.assertThat(forPrimitivesIn.getSideInputs(pTransform), Matchers.contains(new SideInputReference[]{SideInputReference.of(pTransform, str2, pCollection2)}));
        Assert.assertThat(forPrimitivesIn.getPerElementConsumers(pCollection), Matchers.contains(new PipelineNode.PTransformNode[]{pTransform}));
        Assert.assertThat(forPrimitivesIn.getPerElementConsumers(pCollection2), Matchers.not(Matchers.contains(new PipelineNode.PTransformNode[]{pTransform})));
    }

    @Test
    public void transformWithSameSideAndMainInput() {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putPcollections("read_pc", RunnerApi.PCollection.getDefaultInstance()).putPcollections("pardo_out", RunnerApi.PCollection.getDefaultInstance()).putTransforms("root", RunnerApi.PTransform.newBuilder().putOutputs("out", "read_pc").build()).putTransforms("multiConsumer", RunnerApi.PTransform.newBuilder().putInputs("main_in", "read_pc").putInputs("side_in", "read_pc").putOutputs("out", "pardo_out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().putSideInputs("side_in", RunnerApi.SideInput.getDefaultInstance()).build().toByteString()).build()).build()).build();
        QueryablePipeline forPrimitivesIn = QueryablePipeline.forPrimitivesIn(build);
        PipelineNode.PCollectionNode pCollection = PipelineNode.pCollection("read_pc", build.getPcollectionsOrThrow("read_pc"));
        PipelineNode.PTransformNode pTransform = PipelineNode.pTransform("multiConsumer", build.getTransformsOrThrow("multiConsumer"));
        SideInputReference of = SideInputReference.of(pTransform, "side_in", pCollection);
        Assert.assertThat(forPrimitivesIn.getPerElementConsumers(pCollection), Matchers.contains(new PipelineNode.PTransformNode[]{pTransform}));
        Assert.assertThat(forPrimitivesIn.getSideInputs(pTransform), Matchers.contains(new SideInputReference[]{of}));
    }

    @Test
    public void perElementConsumersWithConsumingMultipleTimes() {
        Pipeline create = Pipeline.create();
        PCollection apply = create.apply("BoundedRead", Read.from(CountingSource.upTo(100L)));
        PCollectionList.of(apply).and(apply).and(apply).apply("flatten", Flatten.pCollections());
        RunnerApi.Components components = PipelineTranslation.toProto(create).getComponents();
        String str = (String) Iterables.getOnlyElement(components.getTransformsOrThrow("BoundedRead").getOutputsMap().values());
        Set perElementConsumers = QueryablePipeline.forPrimitivesIn(components).getPerElementConsumers(PipelineNode.pCollection(str, components.getPcollectionsOrThrow(str)));
        Assert.assertThat(Integer.valueOf(perElementConsumers.size()), Matchers.equalTo(1));
        Assert.assertThat(((PipelineNode.PTransformNode) Iterables.getOnlyElement(perElementConsumers)).getTransform().getSpec().getUrn(), Matchers.equalTo(PTransformTranslation.FLATTEN_TRANSFORM_URN));
    }

    @Test
    public void getProducer() {
        Pipeline create = Pipeline.create();
        PCollection apply = create.apply("BoundedRead", Read.from(CountingSource.upTo(100L)));
        PCollectionList.of(apply).and(apply).and(apply).apply("flatten", Flatten.pCollections());
        RunnerApi.Components components = PipelineTranslation.toProto(create).getComponents();
        QueryablePipeline forPrimitivesIn = QueryablePipeline.forPrimitivesIn(components);
        String str = (String) Iterables.getOnlyElement(PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead")).getTransform().getOutputsMap().values());
        PipelineNode.PTransformNode pTransform = PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead"));
        PipelineNode.PCollectionNode pCollection = PipelineNode.pCollection(str, components.getPcollectionsOrThrow(str));
        String str2 = (String) Iterables.getOnlyElement(PipelineNode.pTransform("flatten", components.getTransformsOrThrow("flatten")).getTransform().getOutputsMap().values());
        PipelineNode.PTransformNode pTransform2 = PipelineNode.pTransform("flatten", components.getTransformsOrThrow("flatten"));
        PipelineNode.PCollectionNode pCollection2 = PipelineNode.pCollection(str2, components.getPcollectionsOrThrow(str2));
        Assert.assertThat(forPrimitivesIn.getProducer(pCollection), Matchers.equalTo(pTransform));
        Assert.assertThat(forPrimitivesIn.getProducer(pCollection2), Matchers.equalTo(pTransform2));
    }

    @Test
    public void getEnvironmentWithEnvironment() {
        Pipeline create = Pipeline.create();
        PCollection apply = create.apply("BoundedRead", Read.from(CountingSource.upTo(100L)));
        PCollectionList.of(apply).and(apply).and(apply).apply("flatten", Flatten.pCollections());
        RunnerApi.Components components = PipelineTranslation.toProto(create).getComponents();
        QueryablePipeline forPrimitivesIn = QueryablePipeline.forPrimitivesIn(components);
        PipelineNode.PTransformNode pTransform = PipelineNode.pTransform("BoundedRead", components.getTransformsOrThrow("BoundedRead"));
        PipelineNode.PTransformNode pTransform2 = PipelineNode.pTransform("flatten", components.getTransformsOrThrow("flatten"));
        Assert.assertThat(Boolean.valueOf(forPrimitivesIn.getEnvironment(pTransform).isPresent()), Matchers.is(true));
        Assert.assertThat((RunnerApi.Environment) forPrimitivesIn.getEnvironment(pTransform).get(), Matchers.equalTo(Environments.JAVA_SDK_HARNESS_ENVIRONMENT));
        Assert.assertThat(Boolean.valueOf(forPrimitivesIn.getEnvironment(pTransform2).isPresent()), Matchers.is(false));
    }

    @Test
    public void retainOnlyPrimitivesWithOnlyPrimitivesUnchanged() {
        Pipeline create = Pipeline.create();
        create.apply("Read", Read.from(CountingSource.unbounded())).apply("multi-do", ParDo.of(new TestFn()).withOutputTags(new TupleTag(), TupleTagList.empty()));
        RunnerApi.Components components = PipelineTranslation.toProto(create).getComponents();
        Assert.assertThat(QueryablePipeline.getPrimitiveTransformIds(components), Matchers.equalTo(components.getTransformsMap().keySet()));
    }

    @Test
    public void retainOnlyPrimitivesComposites() {
        Pipeline create = Pipeline.create();
        create.apply(new PTransform<PBegin, PCollection<Long>>() { // from class: org.apache.beam.runners.core.construction.graph.QueryablePipelineTest.1
            public PCollection<Long> expand(PBegin pBegin) {
                return pBegin.apply(GenerateSequence.from(2L)).apply(Window.into(FixedWindows.of(Duration.standardMinutes(5L)))).apply(MapElements.into(TypeDescriptors.longs()).via(l -> {
                    return Long.valueOf(l.longValue() + 1);
                }));
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -1980096491:
                        if (implMethodName.equals("lambda$expand$9d500dbc$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/core/construction/graph/QueryablePipelineTest$1") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                            return l -> {
                                return Long.valueOf(l.longValue() + 1);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        });
        RunnerApi.Components components = PipelineTranslation.toProto(create).getComponents();
        Collection primitiveTransformIds = QueryablePipeline.getPrimitiveTransformIds(components);
        Assert.assertThat(primitiveTransformIds, Matchers.hasSize(3));
        Iterator it = primitiveTransformIds.iterator();
        while (it.hasNext()) {
            Assert.assertThat(components.getTransformsMap(), Matchers.hasKey((String) it.next()));
        }
    }

    @Test
    public void retainOnlyPrimitivesIgnoresUnreachableNodes() {
        Pipeline create = Pipeline.create();
        create.apply(new PTransform<PBegin, PCollection<Long>>() { // from class: org.apache.beam.runners.core.construction.graph.QueryablePipelineTest.2
            public PCollection<Long> expand(PBegin pBegin) {
                return pBegin.apply(GenerateSequence.from(2L)).apply(Window.into(FixedWindows.of(Duration.standardMinutes(5L)))).apply(MapElements.into(TypeDescriptors.longs()).via(l -> {
                    return Long.valueOf(l.longValue() + 1);
                }));
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case -1980096491:
                        if (implMethodName.equals("lambda$expand$9d500dbc$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/core/construction/graph/QueryablePipelineTest$2") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                            return l -> {
                                return Long.valueOf(l.longValue() + 1);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        });
        QueryablePipeline.getPrimitiveTransformIds(PipelineTranslation.toProto(create).getComponents().toBuilder().putCoders("extra-coder", RunnerApi.Coder.getDefaultInstance()).putWindowingStrategies("extra-windowing-strategy", RunnerApi.WindowingStrategy.getDefaultInstance()).putEnvironments("extra-env", RunnerApi.Environment.getDefaultInstance()).putPcollections("extra-pc", RunnerApi.PCollection.getDefaultInstance()).build());
    }
}
