package org.apache.beam.runners.direct.portable;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.class */
public class FlattenEvaluatorFactoryTest {
    private BundleFactory bundleFactory = ImmutableListBundleFactory.create();

    @Test
    public void testFlattenInMemoryEvaluator() throws Exception {
        PipelineNode.PCollectionNode pCollection = PipelineNode.pCollection("left", RunnerApi.PCollection.newBuilder().setUniqueName("left").build());
        PipelineNode.PCollectionNode pCollection2 = PipelineNode.pCollection("right", RunnerApi.PCollection.newBuilder().setUniqueName("right").build());
        PipelineNode.PTransformNode pTransform = PipelineNode.pTransform("source", RunnerApi.PTransform.newBuilder().putOutputs("left", pCollection.getId()).putOutputs("right", pCollection2.getId()).build());
        PipelineNode.PCollectionNode pCollection3 = PipelineNode.pCollection("flat", RunnerApi.PCollection.newBuilder().setUniqueName("flat").build());
        PipelineNode.PTransformNode pTransform2 = PipelineNode.pTransform("flatten", RunnerApi.PTransform.newBuilder().setUniqueName("flatten").putInputs("left", pCollection.getId()).putInputs("right", pCollection2.getId()).putOutputs("out", pCollection3.getId()).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN)).build());
        PortableGraph forPipeline = PortableGraph.forPipeline(RunnerApi.Pipeline.newBuilder().addRootTransformIds(pTransform.getId()).addRootTransformIds(pTransform2.getId()).setComponents(RunnerApi.Components.newBuilder().putTransforms(pTransform.getId(), pTransform.getTransform()).putPcollections(pCollection.getId(), pCollection.getPCollection()).putPcollections(pCollection2.getId(), pCollection2.getPCollection()).putTransforms(pTransform2.getId(), pTransform2.getTransform()).putPcollections(pCollection3.getId(), pCollection3.getPCollection())).build());
        CommittedBundle commit = this.bundleFactory.createBundle(pCollection).commit(Instant.now());
        CommittedBundle commit2 = this.bundleFactory.createBundle(pCollection2).commit(Instant.now());
        FlattenEvaluatorFactory flattenEvaluatorFactory = new FlattenEvaluatorFactory(forPipeline, this.bundleFactory);
        TransformEvaluator forApplication = flattenEvaluatorFactory.forApplication(pTransform2, commit);
        TransformEvaluator forApplication2 = flattenEvaluatorFactory.forApplication(pTransform2, commit2);
        forApplication.processElement(WindowedValue.valueInGlobalWindow(1));
        forApplication2.processElement(WindowedValue.valueInGlobalWindow(-1));
        forApplication.processElement(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024L)));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING));
        forApplication2.processElement(WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
        forApplication2.processElement(WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096L)));
        TransformResult finishBundle = forApplication2.finishBundle();
        Assert.assertThat(((UncommittedBundle) Iterables.getOnlyElement(forApplication.finishBundle().getOutputBundles())).commit(Instant.now()), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024L)), WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING), WindowedValue.valueInGlobalWindow(1)}));
        Assert.assertThat(((UncommittedBundle) Iterables.getOnlyElement(finishBundle.getOutputBundles())).commit(Instant.now()), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096L)), WindowedValue.valueInGlobalWindow(-1)}));
    }
}
