package org.apache.beam.runners.direct;

import org.apache.beam.runners.direct.InProcessPipelineRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.class */
public class FlattenEvaluatorFactoryTest {
    private BundleFactory bundleFactory = InProcessBundleFactory.create();

    @Test
    public void testFlattenInMemoryEvaluator() throws Exception {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply("left", Create.of(new Integer[]{1, 2, 4}));
        PCollection apply2 = create.apply("right", Create.of(new Integer[]{-1, 2, -4}));
        PCollection apply3 = PCollectionList.of(apply).and(apply2).apply(Flatten.pCollections());
        InProcessPipelineRunner.CommittedBundle commit = this.bundleFactory.createRootBundle(apply).commit(Instant.now());
        InProcessPipelineRunner.CommittedBundle commit2 = this.bundleFactory.createRootBundle(apply2).commit(Instant.now());
        InProcessEvaluationContext inProcessEvaluationContext = (InProcessEvaluationContext) Mockito.mock(InProcessEvaluationContext.class);
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(apply3);
        InProcessPipelineRunner.UncommittedBundle createRootBundle2 = this.bundleFactory.createRootBundle(apply3);
        Mockito.when(inProcessEvaluationContext.createBundle(commit, apply3)).thenReturn(createRootBundle);
        Mockito.when(inProcessEvaluationContext.createBundle(commit2, apply3)).thenReturn(createRootBundle2);
        FlattenEvaluatorFactory flattenEvaluatorFactory = new FlattenEvaluatorFactory();
        TransformEvaluator forApplication = flattenEvaluatorFactory.forApplication(apply3.getProducingTransformInternal(), commit, inProcessEvaluationContext);
        TransformEvaluator forApplication2 = flattenEvaluatorFactory.forApplication(apply3.getProducingTransformInternal(), commit2, inProcessEvaluationContext);
        forApplication.processElement(WindowedValue.valueInGlobalWindow(1));
        forApplication2.processElement(WindowedValue.valueInGlobalWindow(-1));
        forApplication.processElement(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024L)));
        forApplication.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING));
        forApplication2.processElement(WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
        forApplication2.processElement(WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096L)));
        InProcessTransformResult finishBundle = forApplication2.finishBundle();
        InProcessTransformResult finishBundle2 = forApplication.finishBundle();
        Assert.assertThat(finishBundle.getOutputBundles(), Matchers.contains(new InProcessPipelineRunner.UncommittedBundle[]{createRootBundle2}));
        Assert.assertThat(finishBundle.getTransform(), Matchers.equalTo(apply3.getProducingTransformInternal()));
        Assert.assertThat(finishBundle2.getOutputBundles(), Matchers.contains(new InProcessPipelineRunner.UncommittedBundle[]{createRootBundle}));
        Assert.assertThat(finishBundle2.getTransform(), Matchers.equalTo(apply3.getProducingTransformInternal()));
        Assert.assertThat(createRootBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024L)), WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING), WindowedValue.valueInGlobalWindow(1)}));
        Assert.assertThat(createRootBundle2.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096L)), WindowedValue.valueInGlobalWindow(-1)}));
    }

    @Test
    public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception {
        PCollection apply = PCollectionList.empty(TestPipeline.create()).apply(Flatten.pCollections());
        InProcessTransformResult finishBundle = new FlattenEvaluatorFactory().forApplication(apply.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, (InProcessEvaluationContext) Mockito.mock(InProcessEvaluationContext.class)).finishBundle();
        Assert.assertThat(finishBundle.getOutputBundles(), Matchers.emptyIterable());
        Assert.assertThat(finishBundle.getTransform(), Matchers.equalTo(apply.getProducingTransformInternal()));
    }
}
