package org.apache.beam.runners.direct;

import java.io.Serializable;
import org.apache.beam.runners.direct.AggregatorContainer;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.StateTags;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.class */
public class ParDoMultiEvaluatorFactoryTest implements Serializable {
    private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();

    @Test
    public void testParDoMultiInMemoryTransformEvaluator() throws Exception {
        PCollection apply = TestPipeline.create().apply(Create.of(new String[]{"foo", "bara", "bazam"}));
        TupleTag<KV<String, Integer>> tupleTag = new TupleTag<KV<String, Integer>>() { // from class: org.apache.beam.runners.direct.ParDoMultiEvaluatorFactoryTest.1
        };
        final TupleTag tupleTag2 = new TupleTag();
        final TupleTag tupleTag3 = new TupleTag();
        PCollectionTuple apply2 = apply.apply(ParDo.of(new OldDoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.runners.direct.ParDoMultiEvaluatorFactoryTest.2
            public void processElement(OldDoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of(processContext.element(), Integer.valueOf(((String) processContext.element()).length())));
                processContext.sideOutput(tupleTag2, processContext.element());
                processContext.sideOutput(tupleTag3, Integer.valueOf(((String) processContext.element()).length()));
            }
        }).withOutputTags(tupleTag, TupleTagList.of(tupleTag2).and(tupleTag3)));
        DirectRunner.CommittedBundle commit = this.bundleFactory.createBundle(apply).commit(Instant.now());
        PCollection pCollection = apply2.get(tupleTag);
        PCollection pCollection2 = apply2.get(tupleTag2);
        PCollection pCollection3 = apply2.get(tupleTag3);
        EvaluationContext evaluationContext = (EvaluationContext) Mockito.mock(EvaluationContext.class);
        DirectRunner.UncommittedBundle createBundle = this.bundleFactory.createBundle(pCollection);
        DirectRunner.UncommittedBundle createBundle2 = this.bundleFactory.createBundle(pCollection2);
        DirectRunner.UncommittedBundle createBundle3 = this.bundleFactory.createBundle(pCollection3);
        Mockito.when(evaluationContext.createBundle(pCollection)).thenReturn(createBundle);
        Mockito.when(evaluationContext.createBundle(pCollection2)).thenReturn(createBundle2);
        Mockito.when(evaluationContext.createBundle(pCollection3)).thenReturn(createBundle3);
        Mockito.when(evaluationContext.getExecutionContext(pCollection.getProducingTransformInternal(), commit.getKey())).thenReturn(new DirectExecutionContext((Clock) null, (StructuralKey) null, (CopyOnAccessInMemoryStateInternals) null, (WatermarkManager.TransformWatermarks) null));
        AggregatorContainer create = AggregatorContainer.create();
        AggregatorContainer.Mutator createMutator = create.createMutator();
        Mockito.when(evaluationContext.getAggregatorContainer()).thenReturn(create);
        Mockito.when(evaluationContext.getAggregatorMutator()).thenReturn(createMutator);
        TransformEvaluator forApplication = new ParDoMultiEvaluatorFactory(evaluationContext).forApplication(pCollection.getProducingTransformInternal(), commit);
        forApplication.processElement(WindowedValue.valueInGlobalWindow("foo"));
        forApplication.processElement(WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000L)));
        forApplication.processElement(WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
        TransformResult finishBundle = forApplication.finishBundle();
        Assert.assertThat(finishBundle.getOutputBundles(), Matchers.containsInAnyOrder(new DirectRunner.UncommittedBundle[]{createBundle3, createBundle, createBundle2}));
        Assert.assertThat(finishBundle.getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
        Assert.assertThat(finishBundle.getAggregatorChanges(), Matchers.equalTo(createMutator));
        Assert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000L)), WindowedValue.valueInGlobalWindow(KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)}));
        Assert.assertThat(createBundle2.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow("foo"), WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000L)), WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)}));
        Assert.assertThat(createBundle3.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(3), WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000L)), WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)}));
    }

    @Test
    public void testParDoMultiUndeclaredSideOutput() throws Exception {
        PCollection apply = TestPipeline.create().apply(Create.of(new String[]{"foo", "bara", "bazam"}));
        TupleTag<KV<String, Integer>> tupleTag = new TupleTag<KV<String, Integer>>() { // from class: org.apache.beam.runners.direct.ParDoMultiEvaluatorFactoryTest.3
        };
        final TupleTag tupleTag2 = new TupleTag();
        final TupleTag tupleTag3 = new TupleTag();
        PCollectionTuple apply2 = apply.apply(ParDo.of(new OldDoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.runners.direct.ParDoMultiEvaluatorFactoryTest.4
            public void processElement(OldDoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of(processContext.element(), Integer.valueOf(((String) processContext.element()).length())));
                processContext.sideOutput(tupleTag2, processContext.element());
                processContext.sideOutput(tupleTag3, Integer.valueOf(((String) processContext.element()).length()));
            }
        }).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        DirectRunner.CommittedBundle commit = this.bundleFactory.createBundle(apply).commit(Instant.now());
        PCollection pCollection = apply2.get(tupleTag);
        PCollection pCollection2 = apply2.get(tupleTag2);
        EvaluationContext evaluationContext = (EvaluationContext) Mockito.mock(EvaluationContext.class);
        DirectRunner.UncommittedBundle createBundle = this.bundleFactory.createBundle(pCollection);
        DirectRunner.UncommittedBundle createBundle2 = this.bundleFactory.createBundle(pCollection2);
        Mockito.when(evaluationContext.createBundle(pCollection)).thenReturn(createBundle);
        Mockito.when(evaluationContext.createBundle(pCollection2)).thenReturn(createBundle2);
        Mockito.when(evaluationContext.getExecutionContext(pCollection.getProducingTransformInternal(), commit.getKey())).thenReturn(new DirectExecutionContext((Clock) null, (StructuralKey) null, (CopyOnAccessInMemoryStateInternals) null, (WatermarkManager.TransformWatermarks) null));
        AggregatorContainer create = AggregatorContainer.create();
        AggregatorContainer.Mutator createMutator = create.createMutator();
        Mockito.when(evaluationContext.getAggregatorContainer()).thenReturn(create);
        Mockito.when(evaluationContext.getAggregatorMutator()).thenReturn(createMutator);
        TransformEvaluator forApplication = new ParDoMultiEvaluatorFactory(evaluationContext).forApplication(pCollection.getProducingTransformInternal(), commit);
        forApplication.processElement(WindowedValue.valueInGlobalWindow("foo"));
        forApplication.processElement(WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000L)));
        forApplication.processElement(WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
        TransformResult finishBundle = forApplication.finishBundle();
        Assert.assertThat(finishBundle.getOutputBundles(), Matchers.containsInAnyOrder(new DirectRunner.UncommittedBundle[]{createBundle, createBundle2}));
        Assert.assertThat(finishBundle.getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
        Assert.assertThat(finishBundle.getAggregatorChanges(), Matchers.equalTo(createMutator));
        Assert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000L)), WindowedValue.valueInGlobalWindow(KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)}));
        Assert.assertThat(createBundle2.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow("foo"), WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000L)), WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)}));
    }

    @Test
    public void finishBundleWithStatePutsStateInResult() throws Exception {
        PCollection apply = TestPipeline.create().apply(Create.of(new String[]{"foo", "bara", "bazam"}));
        TupleTag<KV<String, Integer>> tupleTag = new TupleTag<KV<String, Integer>>() { // from class: org.apache.beam.runners.direct.ParDoMultiEvaluatorFactoryTest.5
        };
        TupleTag tupleTag2 = new TupleTag();
        final StateTag watermarkStateInternal = StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow());
        final StateTag bag = StateTags.bag("myBag", StringUtf8Coder.of());
        StateNamespace window = StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
        PCollectionTuple apply2 = apply.apply(ParDo.of(new OldDoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.runners.direct.ParDoMultiEvaluatorFactoryTest.6
            public void processElement(OldDoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.windowingInternals().stateInternals().state(StateNamespaces.global(), watermarkStateInternal).add(new Instant(20202 + ((String) processContext.element()).length()));
                processContext.windowingInternals().stateInternals().state(StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), bag).add(processContext.element());
            }
        }).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        DirectRunner.CommittedBundle commit = this.bundleFactory.createBundle(apply).commit(Instant.now());
        PCollection pCollection = apply2.get(tupleTag);
        PCollection pCollection2 = apply2.get(tupleTag2);
        EvaluationContext evaluationContext = (EvaluationContext) Mockito.mock(EvaluationContext.class);
        DirectRunner.UncommittedBundle createBundle = this.bundleFactory.createBundle(pCollection);
        DirectRunner.UncommittedBundle createBundle2 = this.bundleFactory.createBundle(pCollection2);
        Mockito.when(evaluationContext.createBundle(pCollection)).thenReturn(createBundle);
        Mockito.when(evaluationContext.createBundle(pCollection2)).thenReturn(createBundle2);
        Mockito.when(evaluationContext.getExecutionContext(pCollection.getProducingTransformInternal(), commit.getKey())).thenReturn(new DirectExecutionContext((Clock) null, StructuralKey.of("myKey", StringUtf8Coder.of()), (CopyOnAccessInMemoryStateInternals) null, (WatermarkManager.TransformWatermarks) null));
        AggregatorContainer create = AggregatorContainer.create();
        AggregatorContainer.Mutator createMutator = create.createMutator();
        Mockito.when(evaluationContext.getAggregatorContainer()).thenReturn(create);
        Mockito.when(evaluationContext.getAggregatorMutator()).thenReturn(createMutator);
        TransformEvaluator forApplication = new ParDoMultiEvaluatorFactory(evaluationContext).forApplication(pCollection.getProducingTransformInternal(), commit);
        forApplication.processElement(WindowedValue.valueInGlobalWindow("foo"));
        forApplication.processElement(WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000L)));
        forApplication.processElement(WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
        TransformResult finishBundle = forApplication.finishBundle();
        Assert.assertThat(finishBundle.getOutputBundles(), Matchers.containsInAnyOrder(new DirectRunner.UncommittedBundle[]{createBundle, createBundle2}));
        Assert.assertThat(finishBundle.getWatermarkHold(), Matchers.equalTo(new Instant(20205L)));
        Assert.assertThat(finishBundle.getState(), Matchers.not(Matchers.nullValue()));
        Assert.assertThat(finishBundle.getState().state(StateNamespaces.global(), watermarkStateInternal).read(), Matchers.equalTo(new Instant(20205L)));
        Assert.assertThat(finishBundle.getState().state(window, bag).read(), Matchers.containsInAnyOrder(new String[]{"foo", "bara", "bazam"}));
    }

    @Test
    public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
        PCollection apply = TestPipeline.create().apply(Create.of(new String[]{"foo", "bara", "bazam"}));
        TupleTag<KV<String, Integer>> tupleTag = new TupleTag<KV<String, Integer>>() { // from class: org.apache.beam.runners.direct.ParDoMultiEvaluatorFactoryTest.7
        };
        TupleTag tupleTag2 = new TupleTag();
        TimerInternals.TimerData of = TimerInternals.TimerData.of(StateNamespaces.window(IntervalWindow.getCoder(), new IntervalWindow(new Instant(0L).plus(Duration.standardMinutes(5L)), new Instant(1L).plus(Duration.standardMinutes(5L)).plus(Duration.standardHours(1L)))), new Instant(54541L), TimeDomain.EVENT_TIME);
        TimerInternals.TimerData of2 = TimerInternals.TimerData.of(StateNamespaces.window(IntervalWindow.getCoder(), new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardHours(1L)))), new Instant(3400000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
        PCollectionTuple apply2 = apply.apply(ParDo.of(new OldDoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.runners.direct.ParDoMultiEvaluatorFactoryTest.8
            public void processElement(OldDoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.windowingInternals().stateInternals();
                processContext.windowingInternals().timerInternals().setTimer(TimerInternals.TimerData.of(StateNamespaces.window(IntervalWindow.getCoder(), new IntervalWindow(new Instant(0L).plus(Duration.standardMinutes(5L)), new Instant(1L).plus(Duration.standardMinutes(5L)).plus(Duration.standardHours(1L)))), new Instant(54541L), TimeDomain.EVENT_TIME));
                processContext.windowingInternals().timerInternals().deleteTimer(TimerInternals.TimerData.of(StateNamespaces.window(IntervalWindow.getCoder(), new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardHours(1L)))), new Instant(3400000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
            }
        }).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        DirectRunner.CommittedBundle commit = this.bundleFactory.createBundle(apply).commit(Instant.now());
        PCollection pCollection = apply2.get(tupleTag);
        PCollection pCollection2 = apply2.get(tupleTag2);
        EvaluationContext evaluationContext = (EvaluationContext) Mockito.mock(EvaluationContext.class);
        DirectRunner.UncommittedBundle createBundle = this.bundleFactory.createBundle(pCollection);
        DirectRunner.UncommittedBundle createBundle2 = this.bundleFactory.createBundle(pCollection2);
        Mockito.when(evaluationContext.createBundle(pCollection)).thenReturn(createBundle);
        Mockito.when(evaluationContext.createBundle(pCollection2)).thenReturn(createBundle2);
        Mockito.when(evaluationContext.getExecutionContext(pCollection.getProducingTransformInternal(), commit.getKey())).thenReturn(new DirectExecutionContext((Clock) null, StructuralKey.of("myKey", StringUtf8Coder.of()), (CopyOnAccessInMemoryStateInternals) null, (WatermarkManager.TransformWatermarks) null));
        AggregatorContainer create = AggregatorContainer.create();
        AggregatorContainer.Mutator createMutator = create.createMutator();
        Mockito.when(evaluationContext.getAggregatorContainer()).thenReturn(create);
        Mockito.when(evaluationContext.getAggregatorMutator()).thenReturn(createMutator);
        TransformEvaluator forApplication = new ParDoMultiEvaluatorFactory(evaluationContext).forApplication(pCollection.getProducingTransformInternal(), commit);
        forApplication.processElement(WindowedValue.valueInGlobalWindow("foo"));
        forApplication.processElement(WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000L)));
        forApplication.processElement(WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
        Assert.assertThat(forApplication.finishBundle().getTimerUpdate(), Matchers.equalTo(WatermarkManager.TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())).setTimer(of).setTimer(of).setTimer(of).deletedTimer(of2).deletedTimer(of2).deletedTimer(of2).build()));
    }
}
