package org.apache.beam.runners.direct;

import java.util.Collections;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.class */
public class TestStreamEvaluatorFactoryTest {
    private TestStreamEvaluatorFactory factory;
    private BundleFactory bundleFactory;
    private EvaluationContext context;

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
    private DirectRunner runner;

    @Before
    public void setup() {
        this.context = (EvaluationContext) Mockito.mock(EvaluationContext.class);
        this.runner = DirectRunner.fromOptions(TestPipeline.testingPipelineOptions());
        this.factory = new TestStreamEvaluatorFactory(this.context);
        this.bundleFactory = ImmutableListBundleFactory.create();
    }

    @Test
    public void producesElementsInSequence() throws Exception {
        PCollection pCollection = (PCollection) this.p.apply(new TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream(this.runner, TestStream.create(VarIntCoder.of()).addElements((TestStream.Builder) 1, (TestStream.Builder[]) new Integer[]{2, 3}).advanceWatermarkTo(new Instant(0L)).addElements(TimestampedValue.atMinimumTimestamp(4), TimestampedValue.atMinimumTimestamp(5), TimestampedValue.atMinimumTimestamp(6)).advanceProcessingTime(Duration.standardMinutes(10L)).advanceWatermarkToInfinity()));
        TestStreamEvaluatorFactory.TestClock testClock = new TestStreamEvaluatorFactory.TestClock();
        Mockito.when(this.context.getClock()).thenReturn(testClock);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Mockito.when(this.context.createBundle(pCollection)).thenReturn(this.bundleFactory.createBundle(pCollection), new UncommittedBundle[]{this.bundleFactory.createBundle(pCollection)});
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(pCollection);
        CommittedBundle committedBundle = (CommittedBundle) Iterables.getOnlyElement(new TestStreamEvaluatorFactory.InputProvider(this.context).getInitialInputs(producer, 1));
        TransformEvaluator forApplication = this.factory.forApplication(producer, committedBundle);
        forApplication.processElement((WindowedValue) Iterables.getOnlyElement(committedBundle.getElements()));
        TransformResult finishBundle = forApplication.finishBundle();
        WindowedValue windowedValue = (WindowedValue) Iterables.getOnlyElement(finishBundle.getUnprocessedElements());
        MatcherAssert.assertThat(Integer.valueOf(((TestStreamEvaluatorFactory.TestStreamIndex) windowedValue.getValue()).getIndex()), Matchers.equalTo(1));
        MatcherAssert.assertThat(windowedValue.getTimestamp(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
        CommittedBundle withElements = committedBundle.withElements(Collections.singleton(windowedValue));
        TransformEvaluator forApplication2 = this.factory.forApplication(producer, withElements);
        forApplication2.processElement(windowedValue);
        WindowedValue windowedValue2 = (WindowedValue) Iterables.getOnlyElement(forApplication2.finishBundle().getUnprocessedElements());
        MatcherAssert.assertThat(Integer.valueOf(((TestStreamEvaluatorFactory.TestStreamIndex) windowedValue2.getValue()).getIndex()), Matchers.equalTo(2));
        MatcherAssert.assertThat(windowedValue2.getTimestamp(), Matchers.equalTo(new Instant(0L)));
        CommittedBundle withElements2 = withElements.withElements(Collections.singleton(windowedValue2));
        TransformEvaluator forApplication3 = this.factory.forApplication(producer, withElements2);
        forApplication3.processElement(windowedValue2);
        TransformResult finishBundle2 = forApplication3.finishBundle();
        WindowedValue windowedValue3 = (WindowedValue) Iterables.getOnlyElement(finishBundle2.getUnprocessedElements());
        MatcherAssert.assertThat(Integer.valueOf(((TestStreamEvaluatorFactory.TestStreamIndex) windowedValue3.getValue()).getIndex()), Matchers.equalTo(3));
        MatcherAssert.assertThat(windowedValue3.getTimestamp(), Matchers.equalTo(new Instant(0L)));
        Instant now = testClock.now();
        TransformEvaluator forApplication4 = this.factory.forApplication(producer, withElements2.withElements(Collections.singleton(windowedValue3)));
        forApplication4.processElement(windowedValue3);
        TransformResult finishBundle3 = forApplication4.finishBundle();
        MatcherAssert.assertThat(testClock.now(), Matchers.equalTo(now.plus(Duration.standardMinutes(10L))));
        WindowedValue windowedValue4 = (WindowedValue) Iterables.getOnlyElement(finishBundle3.getUnprocessedElements());
        MatcherAssert.assertThat(Integer.valueOf(((TestStreamEvaluatorFactory.TestStreamIndex) windowedValue4.getValue()).getIndex()), Matchers.equalTo(4));
        MatcherAssert.assertThat(windowedValue4.getTimestamp(), Matchers.equalTo(new Instant(0L)));
        TransformEvaluator forApplication5 = this.factory.forApplication(producer, withElements2.withElements(Collections.singleton(windowedValue4)));
        forApplication5.processElement(windowedValue4);
        TransformResult finishBundle4 = forApplication5.finishBundle();
        MatcherAssert.assertThat(((UncommittedBundle) Iterables.getOnlyElement(finishBundle.getOutputBundles())).commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(1), WindowedValue.valueInGlobalWindow(2), WindowedValue.valueInGlobalWindow(3)}));
        MatcherAssert.assertThat(((UncommittedBundle) Iterables.getOnlyElement(finishBundle2.getOutputBundles())).commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(4), WindowedValue.valueInGlobalWindow(5), WindowedValue.valueInGlobalWindow(6)}));
        MatcherAssert.assertThat(finishBundle4.getOutputBundles(), Matchers.emptyIterable());
        MatcherAssert.assertThat(finishBundle4.getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
        MatcherAssert.assertThat(finishBundle4.getUnprocessedElements(), Matchers.emptyIterable());
    }
}
