package org.apache.beam.runners.core.construction.graph;

import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.class */
public class GreedyPipelineFuserTest {
    private RunnerApi.Components partialComponents;

    @Before
    public void setup() {
        this.partialComponents = RunnerApi.Components.newBuilder().putTransforms("impulse", RunnerApi.PTransform.newBuilder().setUniqueName("Impulse").putOutputs("output", "impulse.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)).build()).putPcollections("impulse.out", pc("impulse.out")).putEnvironments("go", RunnerApi.Environment.newBuilder().setUrl("go").build()).putEnvironments("py", RunnerApi.Environment.newBuilder().setUrl("py").build()).putCoders("coder", RunnerApi.Coder.newBuilder().build()).putCoders("windowCoder", RunnerApi.Coder.newBuilder().build()).putWindowingStrategies("ws", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()).build();
    }

    private static RunnerApi.PCollection pc(String str) {
        return RunnerApi.PCollection.newBuilder().setUniqueName(str).setCoderId("coder").setWindowingStrategyId("ws").build();
    }

    @Test
    public void singleEnvironmentBecomesASingleStage() {
        RunnerApi.Components build = this.partialComponents.toBuilder().putTransforms("read", RunnerApi.PTransform.newBuilder().setUniqueName("Read").putInputs("input", "impulse.out").putOutputs("output", "read.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("read.out", pc("read.out")).putTransforms("parDo", RunnerApi.PTransform.newBuilder().setUniqueName("ParDo").putInputs("input", "read.out").putOutputs("output", "parDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("parDo.out", pc("parDo.out")).putTransforms("window", RunnerApi.PTransform.newBuilder().setUniqueName("Window").putInputs("input", "parDo.out").putOutputs("output", "window.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN).setPayload(RunnerApi.WindowIntoPayload.newBuilder().setWindowFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("window.out", pc("window.out")).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.contains(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("impulse", build.getTransformsOrThrow("impulse"))}));
        Assert.assertThat(fuse.getFusedStages(), Matchers.contains(ExecutableStageMatcher.withInput("impulse.out").withNoOutputs().withTransforms("read", "parDo", "window")));
    }

    @Test
    public void unknownTransformsNoEnvironmentBecomeRunnerExecuted() {
        RunnerApi.Components build = this.partialComponents.toBuilder().putTransforms("mystery", RunnerApi.PTransform.newBuilder().setUniqueName("Mystery").putInputs("input", "impulse.out").putOutputs("output", "mystery.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:mystery:v1.4")).build()).putPcollections("mystery.out", pc("mystery.out")).putTransforms("enigma", RunnerApi.PTransform.newBuilder().setUniqueName("Enigma").putInputs("input", "impulse.out").putOutputs("output", "enigma.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:enigma:v1")).build()).putPcollections("enigma.out", pc("enigma.out")).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.containsInAnyOrder(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("impulse", build.getTransformsOrThrow("impulse")), PipelineNode.pTransform("mystery", build.getTransformsOrThrow("mystery")), PipelineNode.pTransform("enigma", build.getTransformsOrThrow("enigma"))}));
        Assert.assertThat(fuse.getFusedStages(), Matchers.emptyIterable());
    }

    @Test
    public void singleEnvironmentAcrossGroupByKeyMultipleStages() {
        RunnerApi.Components build = this.partialComponents.toBuilder().putTransforms("read", RunnerApi.PTransform.newBuilder().setUniqueName("Read").putInputs("input", "impulse.out").putOutputs("output", "read.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("read.out", pc("read.out")).putTransforms("groupByKey", RunnerApi.PTransform.newBuilder().setUniqueName("GroupByKey").putInputs("input", "read.out").putOutputs("output", "groupByKey.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN)).build()).putPcollections("groupByKey.out", pc("groupByKey.out")).putTransforms("parDo", RunnerApi.PTransform.newBuilder().setUniqueName("ParDo").putInputs("input", "groupByKey.out").putOutputs("output", "parDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("parDo.out", pc("parDo.out")).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.containsInAnyOrder(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("impulse", build.getTransformsOrThrow("impulse")), PipelineNode.pTransform("groupByKey", build.getTransformsOrThrow("groupByKey"))}));
        Assert.assertThat(fuse.getFusedStages(), Matchers.containsInAnyOrder(new Matcher[]{ExecutableStageMatcher.withInput("impulse.out").withOutputs("read.out").withTransforms("read"), ExecutableStageMatcher.withInput("groupByKey.out").withNoOutputs().withTransforms("parDo")}));
    }

    @Test
    public void multipleEnvironmentsBecomesMultipleStages() {
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(this.partialComponents.toBuilder().putTransforms("read", RunnerApi.PTransform.newBuilder().setUniqueName("Read").putInputs("input", "impulse.out").putOutputs("output", "read.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("read.out", pc("read.out")).putTransforms("goTransform", RunnerApi.PTransform.newBuilder().setUniqueName("GoTransform").putInputs("input", "read.out").putOutputs("output", "go.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("go")).build().toByteString())).build()).putPcollections("go.out", pc("go.out")).putTransforms("pyTransform", RunnerApi.PTransform.newBuilder().setUniqueName("PyTransform").putInputs("input", "read.out").putOutputs("output", "py.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN).setPayload(RunnerApi.WindowIntoPayload.newBuilder().setWindowFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("py.out", pc("py.out")).build()).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.hasSize(1));
        Assert.assertThat(fuse.getFusedStages(), Matchers.hasSize(3));
        Assert.assertThat(fuse.getFusedStages(), Matchers.containsInAnyOrder(new Matcher[]{ExecutableStageMatcher.withInput("impulse.out").withOutputs("read.out").withTransforms("read"), ExecutableStageMatcher.withInput("read.out").withNoOutputs().withTransforms("pyTransform"), ExecutableStageMatcher.withInput("read.out").withNoOutputs().withTransforms("goTransform")}));
    }

    @Test
    public void flattenWithHeterogenousInputsAndOutputsEntirelyMaterialized() {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putCoders("coder", RunnerApi.Coder.newBuilder().build()).putCoders("windowCoder", RunnerApi.Coder.newBuilder().build()).putWindowingStrategies("ws", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()).putTransforms("pyImpulse", RunnerApi.PTransform.newBuilder().setUniqueName("PyImpulse").putOutputs("output", "pyImpulse.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)).build()).putPcollections("pyImpulse.out", pc("pyImpulse.out")).putTransforms("pyRead", RunnerApi.PTransform.newBuilder().setUniqueName("PyRead").putInputs("input", "pyImpulse.out").putOutputs("output", "pyRead.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("pyRead.out", pc("pyRead.out")).putTransforms("goImpulse", RunnerApi.PTransform.newBuilder().setUniqueName("GoImpulse").putOutputs("output", "goImpulse.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)).build()).putPcollections("goImpulse.out", pc("goImpulse.out")).putTransforms("goRead", RunnerApi.PTransform.newBuilder().setUniqueName("GoRead").putInputs("input", "goImpulse.out").putOutputs("output", "goRead.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("go")).build().toByteString())).build()).putPcollections("goRead.out", pc("goRead.out")).putTransforms("flatten", RunnerApi.PTransform.newBuilder().setUniqueName("Flatten").putInputs("goReadInput", "goRead.out").putInputs("pyReadInput", "pyRead.out").putOutputs("output", "flatten.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN)).build()).putPcollections("flatten.out", pc("flatten.out")).putTransforms("pyParDo", RunnerApi.PTransform.newBuilder().setUniqueName("PyParDo").putInputs("input", "flatten.out").putOutputs("output", "pyParDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("pyParDo.out", pc("pyParDo.out")).putTransforms("goParDo", RunnerApi.PTransform.newBuilder().setUniqueName("GoParDo").putInputs("input", "flatten.out").putOutputs("output", "goParDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("go")).build().toByteString())).build()).putPcollections("goParDo.out", pc("goParDo.out")).putEnvironments("go", RunnerApi.Environment.newBuilder().setUrl("go").build()).putEnvironments("py", RunnerApi.Environment.newBuilder().setUrl("py").build()).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.hasSize(3));
        Assert.assertThat("The runner should include the impulses for both languages, plus an introduced flatten", fuse.getRunnerExecutedTransforms(), Matchers.hasItems(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("pyImpulse", build.getTransformsOrThrow("pyImpulse")), PipelineNode.pTransform("goImpulse", build.getTransformsOrThrow("goImpulse"))}));
        PipelineNode.PTransformNode pTransformNode = null;
        for (PipelineNode.PTransformNode pTransformNode2 : fuse.getRunnerExecutedTransforms()) {
            if (((String) Iterables.getOnlyElement(pTransformNode2.getTransform().getOutputsMap().values())).equals("flatten.out")) {
                pTransformNode = pTransformNode2;
            }
        }
        Assert.assertThat(pTransformNode, Matchers.not(Matchers.nullValue()));
        Assert.assertThat(pTransformNode.getTransform().getSpec().getUrn(), Matchers.equalTo(PTransformTranslation.FLATTEN_TRANSFORM_URN));
        Assert.assertThat(new HashSet(pTransformNode.getTransform().getInputsMap().values()), Matchers.hasSize(2));
        Matcher<String> anyOf = Matchers.anyOf((Iterable) pTransformNode.getTransform().getInputsMap().values().stream().map((v0) -> {
            return Matchers.equalTo(v0);
        }).collect(Collectors.toSet()));
        Assert.assertThat(fuse.getFusedStages(), Matchers.containsInAnyOrder(new Matcher[]{ExecutableStageMatcher.withInput("goImpulse.out").withOutputs(anyOf).withTransforms("goRead", "flatten"), ExecutableStageMatcher.withInput("pyImpulse.out").withOutputs(anyOf).withTransforms("pyRead", "flatten"), ExecutableStageMatcher.withInput("flatten.out").withNoOutputs().withTransforms("goParDo"), ExecutableStageMatcher.withInput("flatten.out").withNoOutputs().withTransforms("pyParDo")}));
        Assert.assertThat("All materialized stage outputs should be flattened, and no more", (Set) fuse.getFusedStages().stream().flatMap(executableStage -> {
            return executableStage.getOutputPCollections().stream();
        }).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet()), Matchers.containsInAnyOrder((String[]) pTransformNode.getTransform().getInputsMap().values().toArray(new String[0])));
    }

    @Test
    public void flattenWithHeterogeneousInputsSingleEnvOutputPartiallyMaterialized() {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putCoders("coder", RunnerApi.Coder.newBuilder().build()).putCoders("windowCoder", RunnerApi.Coder.newBuilder().build()).putWindowingStrategies("ws", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()).putTransforms("pyImpulse", RunnerApi.PTransform.newBuilder().setUniqueName("PyImpulse").putOutputs("output", "pyImpulse.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)).build()).putPcollections("pyImpulse.out", pc("pyImpulse.out")).putTransforms("pyRead", RunnerApi.PTransform.newBuilder().setUniqueName("PyRead").putInputs("input", "pyImpulse.out").putOutputs("output", "pyRead.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("pyRead.out", pc("pyRead.out")).putTransforms("goImpulse", RunnerApi.PTransform.newBuilder().setUniqueName("GoImpulse").putOutputs("output", "goImpulse.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)).build()).putPcollections("goImpulse.out", pc("goImpulse.out")).putTransforms("goRead", RunnerApi.PTransform.newBuilder().setUniqueName("GoRead").putInputs("input", "goImpulse.out").putOutputs("output", "goRead.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("go")).build().toByteString())).build()).putPcollections("goRead.out", pc("goRead.out")).putTransforms("flatten", RunnerApi.PTransform.newBuilder().setUniqueName("Flatten").putInputs("goReadInput", "goRead.out").putInputs("pyReadInput", "pyRead.out").putOutputs("output", "flatten.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN)).build()).putPcollections("flatten.out", pc("flatten.out")).putTransforms("goParDo", RunnerApi.PTransform.newBuilder().setUniqueName("GoParDo").putInputs("input", "flatten.out").putOutputs("output", "goParDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("go")).build().toByteString())).build()).putPcollections("goParDo.out", pc("goParDo.out")).putEnvironments("go", RunnerApi.Environment.newBuilder().setUrl("go").build()).putEnvironments("py", RunnerApi.Environment.newBuilder().setUrl("py").build()).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.containsInAnyOrder(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("pyImpulse", build.getTransformsOrThrow("pyImpulse")), PipelineNode.pTransform("goImpulse", build.getTransformsOrThrow("goImpulse"))}));
        Assert.assertThat(fuse.getFusedStages(), Matchers.containsInAnyOrder(new Matcher[]{ExecutableStageMatcher.withInput("goImpulse.out").withNoOutputs().withTransforms("goRead", "flatten", "goParDo"), ExecutableStageMatcher.withInput("pyImpulse.out").withOutputs("flatten.out").withTransforms("pyRead", "flatten"), ExecutableStageMatcher.withInput("flatten.out").withNoOutputs().withTransforms("goParDo")}));
    }

    @Test
    public void flattenAfterNoEnvDoesNotFuse() {
        RunnerApi.Components build = this.partialComponents.toBuilder().putTransforms("flatten", RunnerApi.PTransform.newBuilder().setUniqueName("Flatten").putInputs("impulseInput", "impulse.out").putOutputs("output", "flatten.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN).build()).build()).putPcollections("flatten.out", pc("flatten.out")).putTransforms("read", RunnerApi.PTransform.newBuilder().setUniqueName("Read").putInputs("input", "flatten.out").putOutputs("output", "read.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("read.out", pc("read.out")).putTransforms("parDo", RunnerApi.PTransform.newBuilder().setUniqueName("ParDo").putInputs("input", "read.out").putOutputs("output", "parDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py").build()).build().toByteString())).build()).putPcollections("parDo.out", pc("parDo.out")).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.containsInAnyOrder(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("impulse", build.getTransformsOrThrow("impulse")), PipelineNode.pTransform("flatten", build.getTransformsOrThrow("flatten"))}));
        Assert.assertThat(fuse.getFusedStages(), Matchers.contains(ExecutableStageMatcher.withInput("flatten.out").withNoOutputs().withTransforms("read", "parDo")));
    }

    @Test
    public void sideInputRootsNewStage() {
        RunnerApi.Components build = RunnerApi.Components.newBuilder().putCoders("coder", RunnerApi.Coder.newBuilder().build()).putCoders("windowCoder", RunnerApi.Coder.newBuilder().build()).putWindowingStrategies("ws", RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId("windowCoder").build()).putTransforms("mainImpulse", RunnerApi.PTransform.newBuilder().setUniqueName("MainImpulse").putOutputs("output", "mainImpulse.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)).build()).putPcollections("mainImpulse.out", pc("mainImpulse.out")).putTransforms("read", RunnerApi.PTransform.newBuilder().setUniqueName("Read").putInputs("input", "mainImpulse.out").putOutputs("output", "read.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("read.out", pc("read.out")).putTransforms("sideImpulse", RunnerApi.PTransform.newBuilder().setUniqueName("SideImpulse").putOutputs("output", "sideImpulse.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)).build()).putPcollections("sideImpulse.out", pc("sideImpulse.out")).putTransforms("sideRead", RunnerApi.PTransform.newBuilder().setUniqueName("SideRead").putInputs("input", "sideImpulse.out").putOutputs("output", "sideRead.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("sideRead.out", pc("sideRead.out")).putTransforms("leftParDo", RunnerApi.PTransform.newBuilder().setUniqueName("LeftParDo").putInputs("main", "read.out").putOutputs("output", "leftParDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString()).build()).build()).putPcollections("leftParDo.out", pc("leftParDo.out")).putTransforms("rightParDo", RunnerApi.PTransform.newBuilder().setUniqueName("RightParDo").putInputs("main", "read.out").putOutputs("output", "rightParDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString()).build()).build()).putPcollections("rightParDo.out", pc("rightParDo.out")).putTransforms("sideParDo", RunnerApi.PTransform.newBuilder().setUniqueName("SideParDo").putInputs("main", "read.out").putInputs("side", "sideRead.out").putOutputs("output", "sideParDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).putSideInputs("side", RunnerApi.SideInput.getDefaultInstance()).build().toByteString()).build()).build()).putPcollections("sideParDo.out", pc("sideParDo.out")).putEnvironments("py", RunnerApi.Environment.newBuilder().setUrl("py").build()).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.containsInAnyOrder(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("mainImpulse", build.getTransformsOrThrow("mainImpulse")), PipelineNode.pTransform("sideImpulse", build.getTransformsOrThrow("sideImpulse"))}));
        Assert.assertThat(fuse.getFusedStages(), Matchers.containsInAnyOrder(new Matcher[]{ExecutableStageMatcher.withInput("mainImpulse.out").withOutputs("read.out").withTransforms("read"), ExecutableStageMatcher.withInput("read.out").withNoOutputs().withTransforms("leftParDo", "rightParDo"), ExecutableStageMatcher.withInput("read.out").withSideInputs(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId("sideParDo").setLocalName("side").build()).withNoOutputs().withTransforms("sideParDo"), ExecutableStageMatcher.withInput("sideImpulse.out").withOutputs("sideRead.out").withTransforms("sideRead")}));
    }

    @Test
    public void statefulParDoRootsStage() {
        RunnerApi.Components build = this.partialComponents.toBuilder().putTransforms("parDo", RunnerApi.PTransform.newBuilder().setUniqueName("ParDo").putInputs("input", "impulse.out").putOutputs("output", "parDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("common")).build().toByteString())).build()).putPcollections("parDo.out", pc("parDo.out")).putTransforms("stateful", RunnerApi.PTransform.newBuilder().setUniqueName("StatefulParDo").putInputs("input", "parDo.out").putOutputs("output", "stateful.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("common")).putStateSpecs("state", RunnerApi.StateSpec.getDefaultInstance()).build().toByteString())).build()).putPcollections("stateful.out", pc("stateful.out")).putEnvironments("common", RunnerApi.Environment.newBuilder().setUrl("common").build()).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.containsInAnyOrder(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("impulse", build.getTransformsOrThrow("impulse"))}));
        Assert.assertThat(fuse.getFusedStages(), Matchers.containsInAnyOrder(new Matcher[]{ExecutableStageMatcher.withInput("impulse.out").withOutputs("parDo.out").withTransforms("parDo"), ExecutableStageMatcher.withInput("parDo.out").withNoOutputs().withTransforms("stateful")}));
    }

    @Test
    public void parDoWithTimerRootsStage() {
        RunnerApi.Components build = this.partialComponents.toBuilder().putTransforms("parDo", RunnerApi.PTransform.newBuilder().setUniqueName("ParDo").putInputs("input", "impulse.out").putOutputs("output", "parDo.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("common")).build().toByteString())).build()).putPcollections("parDo.out", pc("parDo.out")).putTransforms("timer", RunnerApi.PTransform.newBuilder().setUniqueName("TimerParDo").putInputs("input", "parDo.out").putOutputs("output", "timer.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("common")).putTimerSpecs("timer", RunnerApi.TimerSpec.getDefaultInstance()).build().toByteString())).build()).putPcollections("timer.out", pc("timer.out")).putEnvironments("common", RunnerApi.Environment.newBuilder().setUrl("common").build()).build();
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().setComponents(build).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.containsInAnyOrder(new PipelineNode.PTransformNode[]{PipelineNode.pTransform("impulse", build.getTransformsOrThrow("impulse"))}));
        Assert.assertThat(fuse.getFusedStages(), Matchers.containsInAnyOrder(new Matcher[]{ExecutableStageMatcher.withInput("impulse.out").withOutputs("parDo.out").withTransforms("parDo"), ExecutableStageMatcher.withInput("parDo.out").withNoOutputs().withTransforms("timer")}));
    }

    @Test
    public void compositesIgnored() {
        FusedPipeline fuse = GreedyPipelineFuser.fuse(RunnerApi.Pipeline.newBuilder().addRootTransformIds("impulse").addRootTransformIds("compositeMultiLang").setComponents(this.partialComponents.toBuilder().putTransforms("read", RunnerApi.PTransform.newBuilder().setUniqueName("Read").putInputs("input", "impulse.out").putOutputs("output", "read.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("read.out", pc("read.out")).putTransforms("goTransform", RunnerApi.PTransform.newBuilder().setUniqueName("GoTransform").putInputs("input", "read.out").putOutputs("output", "go.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN).setPayload(RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("go")).build().toByteString())).build()).putPcollections("go.out", pc("go.out")).putTransforms("pyTransform", RunnerApi.PTransform.newBuilder().setUniqueName("PyTransform").putInputs("input", "read.out").putOutputs("output", "py.out").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN).setPayload(RunnerApi.WindowIntoPayload.newBuilder().setWindowFn(RunnerApi.SdkFunctionSpec.newBuilder().setEnvironmentId("py")).build().toByteString())).build()).putPcollections("py.out", pc("py.out")).putTransforms("compositeMultiLang", RunnerApi.PTransform.newBuilder().setUniqueName("CompositeMultiLang").putInputs("input", "impulse.out").putOutputs("pyOut", "py.out").putOutputs("goOut", "go.out").addSubtransforms("read").addSubtransforms("goTransform").addSubtransforms("pyTransform").build()).build()).build());
        Assert.assertThat(fuse.getRunnerExecutedTransforms(), Matchers.hasSize(1));
        Assert.assertThat(fuse.getFusedStages(), Matchers.hasSize(3));
        Assert.assertThat(fuse.getFusedStages(), Matchers.containsInAnyOrder(new Matcher[]{ExecutableStageMatcher.withInput("impulse.out").withOutputs("read.out").withTransforms("read"), ExecutableStageMatcher.withInput("read.out").withNoOutputs().withTransforms("pyTransform"), ExecutableStageMatcher.withInput("read.out").withNoOutputs().withTransforms("goTransform")}));
    }
}
