package org.apache.beam.runners.direct;

import java.util.Collection;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.ImpulseEvaluatorFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/ImpulseEvaluatorFactoryTest.class */
public class ImpulseEvaluatorFactoryTest {
    private BundleFactory bundleFactory = ImmutableListBundleFactory.create();

    @Mock
    private EvaluationContext context;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void testImpulse() throws Exception {
        PCollection apply = Pipeline.create().apply(Impulse.create());
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(apply);
        ImpulseEvaluatorFactory impulseEvaluatorFactory = new ImpulseEvaluatorFactory(this.context);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new ImpulseEvaluatorFactory.ImpulseShard());
        CommittedBundle commit = this.bundleFactory.createRootBundle().add(valueInGlobalWindow).commit(Instant.now());
        Mockito.when(this.context.createBundle(apply)).thenReturn(this.bundleFactory.createBundle(apply));
        TransformEvaluator forApplication = impulseEvaluatorFactory.forApplication(producer, commit);
        forApplication.processElement(valueInGlobalWindow);
        TransformResult finishBundle = forApplication.finishBundle();
        Assert.assertThat("Exactly one output from a single ImpulseShard", Integer.valueOf(Iterables.size(finishBundle.getOutputBundles())), Matchers.equalTo(1));
        CommittedBundle commit2 = ((UncommittedBundle) finishBundle.getOutputBundles().iterator().next()).commit(Instant.now());
        Assert.assertThat(commit2.getMinimumTimestamp(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
        Assert.assertThat(commit2.getPCollection(), Matchers.equalTo(apply));
        Assert.assertThat("Should only be one impulse element", Integer.valueOf(Iterables.size(commit2.getElements())), Matchers.equalTo(1));
        Assert.assertThat(((WindowedValue) commit2.getElements().iterator().next()).getWindows(), Matchers.contains(new BoundedWindow[]{GlobalWindow.INSTANCE}));
        Assert.assertArrayEquals("Output should be an empty byte array", new byte[0], (byte[]) ((WindowedValue) commit2.getElements().iterator().next()).getValue());
    }

    @Test
    public void testRootProvider() {
        Pipeline create = Pipeline.create();
        PCollection apply = create.apply(Impulse.create());
        create.apply(Impulse.create());
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(apply);
        ImpulseEvaluatorFactory.ImpulseRootProvider impulseRootProvider = new ImpulseEvaluatorFactory.ImpulseRootProvider(this.context);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Collection initialInputs = impulseRootProvider.getInitialInputs(producer, 100);
        Assert.assertThat("Only one impulse bundle per application", initialInputs, Matchers.hasSize(1));
        Assert.assertThat("Only one impulse shard per bundle", Integer.valueOf(Iterables.size(((CommittedBundle) initialInputs.iterator().next()).getElements())), Matchers.equalTo(1));
    }
}
