package org.apache.beam.runners.direct;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.class */
public class FlattenEvaluatorFactoryTest {
    private BundleFactory bundleFactory = ImmutableListBundleFactory.create();

    @Test
    public void testFlattenInMemoryEvaluator() throws Exception {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply("left", Create.of(new Integer[]{1, 2, 4}));
        PCollection apply2 = create.apply("right", Create.of(new Integer[]{-1, 2, -4}));
        PCollection apply3 = PCollectionList.of(apply).and(apply2).apply(Flatten.pCollections());
        DirectRunner.CommittedBundle commit = this.bundleFactory.createBundle(apply).commit(Instant.now());
        DirectRunner.CommittedBundle commit2 = this.bundleFactory.createBundle(apply2).commit(Instant.now());
        EvaluationContext evaluationContext = (EvaluationContext) Mockito.mock(EvaluationContext.class);
        DirectRunner.UncommittedBundle createBundle = this.bundleFactory.createBundle(apply3);
        DirectRunner.UncommittedBundle createBundle2 = this.bundleFactory.createBundle(apply3);
        Mockito.when(evaluationContext.createBundle(apply3)).thenReturn(createBundle, new DirectRunner.UncommittedBundle[]{createBundle2});
        FlattenEvaluatorFactory flattenEvaluatorFactory = new FlattenEvaluatorFactory(evaluationContext);
        TransformEvaluator forApplication = flattenEvaluatorFactory.forApplication(apply3.getProducingTransformInternal(), commit);
        TransformEvaluator forApplication2 = flattenEvaluatorFactory.forApplication(apply3.getProducingTransformInternal(), commit2);
        forApplication.processElement(WindowedValue.valueInGlobalWindow(1));
        forApplication2.processElement(WindowedValue.valueInGlobalWindow(-1));
        forApplication.processElement(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024L)));
        forApplication.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING));
        forApplication2.processElement(WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
        forApplication2.processElement(WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096L)));
        TransformResult finishBundle = forApplication2.finishBundle();
        TransformResult finishBundle2 = forApplication.finishBundle();
        Assert.assertThat(finishBundle.getOutputBundles(), Matchers.contains(new DirectRunner.UncommittedBundle[]{createBundle2}));
        Assert.assertThat(finishBundle.getTransform(), Matchers.equalTo(apply3.getProducingTransformInternal()));
        Assert.assertThat(finishBundle2.getOutputBundles(), Matchers.contains(new DirectRunner.UncommittedBundle[]{createBundle}));
        Assert.assertThat(finishBundle2.getTransform(), Matchers.equalTo(apply3.getProducingTransformInternal()));
        Assert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024L)), WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING), WindowedValue.valueInGlobalWindow(1)}));
        Assert.assertThat(createBundle2.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096L)), WindowedValue.valueInGlobalWindow(-1)}));
    }

    @Test
    public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception {
        PCollection apply = PCollectionList.empty(TestPipeline.create()).apply(Flatten.pCollections());
        EvaluationContext evaluationContext = (EvaluationContext) Mockito.mock(EvaluationContext.class);
        Mockito.when(evaluationContext.createBundle(apply)).thenReturn(this.bundleFactory.createBundle(apply));
        TransformResult finishBundle = new FlattenEvaluatorFactory(evaluationContext).forApplication(apply.getProducingTransformInternal(), this.bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE)).finishBundle();
        Assert.assertThat(((DirectRunner.UncommittedBundle) Iterables.getOnlyElement(finishBundle.getOutputBundles())).commit(Instant.now()).getElements(), Matchers.emptyIterable());
        Assert.assertThat(finishBundle.getTransform(), Matchers.equalTo(apply.getProducingTransformInternal()));
    }
}
