package org.apache.beam.runners.core;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/ReduceFnRunnerTest.class */
public class ReduceFnRunnerTest {
    private static final Logger LOG = LoggerFactory.getLogger(ReduceFnRunnerTest.class);

    @Mock
    private SideInputReader mockSideInputReader;
    private TriggerStateMachine mockTriggerStateMachine;
    private PCollectionView<Integer> mockView;
    private IntervalWindow firstWindow;

    /* loaded from: input_file:org/apache/beam/runners/core/ReduceFnRunnerTest$SumAndVerifyContextFn.class */
    private static class SumAndVerifyContextFn extends CombineWithContext.CombineFnWithContext<Integer, Integer, Integer> {
        private final PCollectionView<Integer> view;
        private final int expectedValue;

        private SumAndVerifyContextFn(PCollectionView<Integer> pCollectionView, int i) {
            this.view = pCollectionView;
            this.expectedValue = i;
        }

        private void verifyContext(CombineWithContext.Context context) {
            MatcherAssert.assertThat(Integer.valueOf(this.expectedValue), Matchers.equalTo(Integer.valueOf(((TestOptions) context.getPipelineOptions().as(TestOptions.class)).getValue())));
            MatcherAssert.assertThat((Integer) context.sideInput(this.view), Matchers.greaterThanOrEqualTo(100));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext
        public Integer createAccumulator(CombineWithContext.Context context) {
            verifyContext(context);
            return 0;
        }

        @Override // org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext
        public Integer addInput(Integer num, Integer num2, CombineWithContext.Context context) {
            verifyContext(context);
            return Integer.valueOf(num.intValue() + num2.intValue());
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext
        public Integer mergeAccumulators(Iterable<Integer> iterable, CombineWithContext.Context context) {
            verifyContext(context);
            int i = 0;
            Iterator<Integer> it = iterable.iterator();
            while (it.hasNext()) {
                i += it.next().intValue();
            }
            return Integer.valueOf(i);
        }

        @Override // org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext
        public Integer extractOutput(Integer num, CombineWithContext.Context context) {
            verifyContext(context);
            return Integer.valueOf(num.intValue() + ((Integer) context.sideInput(this.view)).intValue());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/ReduceFnRunnerTest$TestOptions.class */
    public interface TestOptions extends PipelineOptions {
        int getValue();

        void setValue(int i);
    }

    private static TriggerStateMachine.TriggerContext anyTriggerContext() {
        return (TriggerStateMachine.TriggerContext) Mockito.any();
    }

    private static TriggerStateMachine.OnElementContext anyElementContext() {
        return (TriggerStateMachine.OnElementContext) Mockito.any();
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        this.mockTriggerStateMachine = (TriggerStateMachine) Mockito.mock(TriggerStateMachine.class, Mockito.withSettings().serializable());
        this.mockView = (PCollectionView) Mockito.mock(PCollectionView.class, Mockito.withSettings().serializable());
        this.firstWindow = new IntervalWindow(new Instant(0L), new Instant(10L));
    }

    private void injectElement(ReduceFnTester<Integer, ?, IntervalWindow> reduceFnTester, int i) throws Exception {
        ((TriggerStateMachine) Mockito.doNothing().when(this.mockTriggerStateMachine)).onElement(anyElementContext());
        reduceFnTester.injectElements(TimestampedValue.of(Integer.valueOf(i), new Instant(i)));
    }

    private void injectElements(ReduceFnTester<Integer, ?, IntervalWindow> reduceFnTester, Iterable<Integer> iterable) throws Exception {
        ((TriggerStateMachine) Mockito.doNothing().when(this.mockTriggerStateMachine)).onElement(anyElementContext());
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = iterable.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            arrayList.add(TimestampedValue.of(Integer.valueOf(intValue), new Instant(intValue)));
        }
        reduceFnTester.injectElements(arrayList);
    }

    private void injectElements(ReduceFnTester<Integer, ?, IntervalWindow> reduceFnTester, Integer... numArr) throws Exception {
        injectElements(reduceFnTester, Arrays.asList(numArr));
    }

    private void triggerShouldFinish(TriggerStateMachine triggerStateMachine) throws Exception {
        ((TriggerStateMachine) Mockito.doAnswer(invocationOnMock -> {
            ((TriggerStateMachine.TriggerContext) invocationOnMock.getArguments()[0]).trigger().setFinished(true);
            return null;
        }).when(triggerStateMachine)).onFire(anyTriggerContext());
    }

    @Test
    public void testProcessingTimeTimerDoesNotGc() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(100L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.ZERO).withTrigger(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10L)))), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceProcessingTime(new Instant(5000L));
        injectElement(combining, 2);
        injectElement(combining, 5);
        combining.advanceProcessingTime(new Instant(10000L));
        combining.assertHasOnlyGlobalAndStateFor(new IntervalWindow(new Instant(0L), new Instant(100L)));
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.equalTo(7), 2L, 0L, 100L, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, 0L))));
    }

    @Test
    public void testOnElementBufferingDiscarding() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        injectElement(nonCombining, 1);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(nonCombining, 2);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 1L, 0L, 10L)));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(nonCombining, 3);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{3}), 3L, 0L, 10L)));
        Assert.assertTrue(nonCombining.isMarkedFinished(this.firstWindow));
        nonCombining.assertHasOnlyGlobalAndFinishedSetsFor(this.firstWindow);
        injectElement(nonCombining, 4);
        Assert.assertEquals(1L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
    }

    @Test
    public void testOnElementBufferingAccumulating() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        injectElement(nonCombining, 1);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(nonCombining, 2);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(nonCombining, 3);
        injectElement(nonCombining, 4);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(new Matcher[]{WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 1L, 0L, 10L), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2, 3}), 3L, 0L, 10L)}));
        Assert.assertTrue(nonCombining.isMarkedFinished(this.firstWindow));
        nonCombining.assertHasOnlyGlobalAndFinishedSetsFor(this.firstWindow);
    }

    @Test
    public void testSessionEowAndGcTogether() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10L)), DefaultTriggerStateMachine.of(), WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(50L), Window.ClosingBehavior.FIRE_ALWAYS);
        nonCombining.setAutoAdvanceOutputWatermark(true);
        nonCombining.advanceInputWatermark(new Instant(0L));
        injectElement(nonCombining, 1);
        nonCombining.advanceInputWatermark(new Instant(100L));
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.contains(new Integer[]{1}), 1L, 1L, 11L, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))));
    }

    @Test
    public void testFixedWindowsEowAndGcTogether() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), DefaultTriggerStateMachine.of(), WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(50L), Window.ClosingBehavior.FIRE_ALWAYS);
        nonCombining.setAutoAdvanceOutputWatermark(true);
        nonCombining.advanceInputWatermark(new Instant(0L));
        injectElement(nonCombining, 1);
        nonCombining.advanceInputWatermark(new Instant(100L));
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.contains(new Integer[]{1}), 1L, 0L, 10L, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))));
    }

    @Test
    public void testFixedWindowsEowAndGcTogetherFireIfNonEmpty() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), DefaultTriggerStateMachine.of(), WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(50L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        nonCombining.setAutoAdvanceOutputWatermark(true);
        nonCombining.advanceInputWatermark(new Instant(0L));
        injectElement(nonCombining, 1);
        nonCombining.advanceInputWatermark(new Instant(100L));
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.contains(new Integer[]{1}), 1L, 0L, 10L, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))));
    }

    @Test
    public void testOnlyOneOnTimePane() throws Exception {
        ReduceFnTester combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTrigger(DefaultTrigger.of()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceInputWatermark(new Instant(0L));
        combining.injectElements(TimestampedValue.of(1, new Instant(1L)));
        combining.advanceInputWatermark(new Instant(10L));
        combining.injectElements(TimestampedValue.of(3, new Instant(3L)));
        List extractOutput = combining.extractOutput();
        Assert.assertEquals(2L, extractOutput.size());
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isWindowedValue(Matchers.equalTo(1)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.isWindowedValue(Matchers.equalTo(Integer.valueOf(1 + 3))));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0L, 0L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1L, 1L)));
    }

    @Test
    public void testOnElementCombiningDiscarding() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)), this.mockTriggerStateMachine, Sum.ofIntegers(), VarIntCoder.of());
        injectElement(combining, 2);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(combining, 3);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(combining, 4);
        injectElement(combining, 6);
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.contains(new Matcher[]{WindowMatchers.isSingleWindowedValue(Matchers.equalTo(5), 2L, 0L, 10L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(4), 4L, 0L, 10L)}));
        Assert.assertTrue(combining.isMarkedFinished(this.firstWindow));
        combining.assertHasOnlyGlobalAndFinishedSetsFor(this.firstWindow);
    }

    @Test
    public void testLateProcessingTimeTimer() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(100L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.ZERO).withTrigger(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10L)))), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceProcessingTime(new Instant(5000L));
        injectElement(combining, 2);
        injectElement(combining, 5);
        combining.advanceInputWatermarkNoTimers(new Instant(100L));
        combining.advanceProcessingTime(new Instant(6000L));
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.emptyIterable());
    }

    @Test
    public void testCombiningAccumulatingProcessingTime() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(100L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.ZERO).withTrigger(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10L)))), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceProcessingTime(new Instant(5000L));
        injectElement(combining, 2);
        injectElement(combining, 5);
        combining.advanceInputWatermarkNoTimers(new Instant(100L));
        combining.advanceProcessingTimeNoTimers(new Instant(5010L));
        combining.fireTimers(new IntervalWindow(new Instant(0L), new Instant(100L)), TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100L)), TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010L)));
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.equalTo(7), 2L, 0L, 100L, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0L, 0L))));
    }

    @Test
    public void testFixedWindowEndOfTimeGarbageCollection() throws Exception {
        Duration standardDays = Duration.standardDays(365L);
        FixedWindows of = FixedWindows.of(Duration.millis(10L));
        Instant plus = GlobalWindow.INSTANCE.maxTimestamp().minus(standardDays).plus(Duration.millis(1L));
        Objects.requireNonNull(of);
        IntervalWindow intervalWindow = (IntervalWindow) Iterables.getOnlyElement(of.assignWindows(new WindowFn<Object, IntervalWindow>.AssignContext(of, plus) { // from class: org.apache.beam.runners.core.ReduceFnRunnerTest.1
            final /* synthetic */ Instant val$elementTimestamp;

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
                this.val$elementTimestamp = plus;
                Objects.requireNonNull(of);
            }

            @Override // org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext
            public Object element() {
                throw new UnsupportedOperationException();
            }

            @Override // org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext
            public Instant timestamp() {
                return this.val$elementTimestamp;
            }

            @Override // org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext
            public BoundedWindow window() {
                throw new UnsupportedOperationException();
            }
        }));
        Assert.assertTrue(intervalWindow.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp()));
        Assert.assertTrue(intervalWindow.maxTimestamp().plus(standardDays).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
        ReduceFnTester combining = ReduceFnTester.combining(WindowingStrategy.of(of).withTimestampCombiner(TimestampCombiner.EARLIEST).withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever())).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withAllowedLateness(standardDays), Sum.ofIntegers(), VarIntCoder.of());
        combining.injectElements(TimestampedValue.of(13, plus));
        combining.advanceInputWatermark(intervalWindow.maxTimestamp());
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.emptyIterable());
        combining.injectElements(TimestampedValue.of(42, plus));
        combining.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp());
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.contains(WindowMatchers.isWindowedValue(Matchers.equalTo(55))));
    }

    @Test
    public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(100L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.ZERO).withTrigger(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10L)))), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceProcessingTime(new Instant(5000L));
        injectElement(combining, 2);
        injectElement(combining, 5);
        combining.advanceInputWatermark(new Instant(100L));
        combining.advanceProcessingTime(new Instant(5011L));
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.equalTo(7), 2L, 0L, 100L, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0L, 0L))));
    }

    @Test
    public void testCombiningAccumulatingEventTime() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(100L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(1L)).withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())), Sum.ofIntegers(), VarIntCoder.of());
        injectElement(combining, 2);
        injectElement(combining, 5);
        combining.advanceInputWatermark(new Instant(1000L));
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.equalTo(7), 2L, 0L, 100L, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0L, 0L))));
    }

    @Test
    public void testOnElementCombiningAccumulating() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)), this.mockTriggerStateMachine, Sum.ofIntegers(), VarIntCoder.of());
        injectElement(combining, 1);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(combining, 2);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(combining, 3);
        injectElement(combining, 4);
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.contains(new Matcher[]{WindowMatchers.isSingleWindowedValue(Matchers.equalTo(3), 1L, 0L, 10L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(6), 3L, 0L, 10L)}));
        Assert.assertTrue(combining.isMarkedFinished(this.firstWindow));
        combining.assertHasOnlyGlobalAndFinishedSetsFor(this.firstWindow);
    }

    @Test
    public void testOnElementCombiningWithContext() throws Exception {
        Integer num = 100;
        WindowingStrategy withMode = WindowingStrategy.of(FixedWindows.of(Duration.millis(2L))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
        Mockito.when(this.mockView.getWindowMappingFn()).thenReturn(FixedWindows.of(Duration.millis(4L)).getDefaultWindowMappingFn());
        TestOptions testOptions = (TestOptions) PipelineOptionsFactory.as(TestOptions.class);
        testOptions.setValue(num.intValue());
        Mockito.when(Boolean.valueOf(this.mockSideInputReader.contains((PCollectionView) org.mockito.Matchers.any(PCollectionView.class)))).thenReturn(true);
        Mockito.when(this.mockSideInputReader.get((PCollectionView) org.mockito.Matchers.any(PCollectionView.class), (BoundedWindow) org.mockito.Matchers.any(BoundedWindow.class))).then(invocationOnMock -> {
            IntervalWindow intervalWindow = (IntervalWindow) invocationOnMock.getArguments()[1];
            long millis = intervalWindow.start().getMillis();
            long millis2 = intervalWindow.end().getMillis();
            MatcherAssert.assertThat(Long.valueOf(millis), Matchers.anyOf(Matchers.equalTo(0L), Matchers.equalTo(4L)));
            MatcherAssert.assertThat(Long.valueOf(millis2 - millis), Matchers.equalTo(4L));
            return Integer.valueOf(100 + ((int) millis));
        });
        ReduceFnTester<Integer, ?, IntervalWindow> combining = ReduceFnTester.combining(withMode, this.mockTriggerStateMachine, new SumAndVerifyContextFn(this.mockView, num.intValue()), VarIntCoder.of(), testOptions, this.mockSideInputReader);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        for (int i = 0; i < 8; i++) {
            injectElement(combining, i);
        }
        MatcherAssert.assertThat(combining.extractOutput(), Matchers.contains(new Matcher[]{WindowMatchers.isSingleWindowedValue(Matchers.equalTo(100), 1L, 0L, 2L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(101), 1L, 0L, 2L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(102), 3L, 2L, 4L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(105), 3L, 2L, 4L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(108), 5L, 4L, 6L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(113), 5L, 4L, 6L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(110), 7L, 6L, 8L), WindowMatchers.isSingleWindowedValue(Matchers.equalTo(117), 7L, 6L, 8L)}));
    }

    @Test
    public void testWatermarkHoldAndLateData() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        Duration millis = Duration.millis(10L);
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, millis, Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        Assert.assertEquals((Object) null, nonCombining.getWatermarkHold());
        Assert.assertEquals((Object) null, nonCombining.getOutputWatermark());
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(10L));
        injectElement(nonCombining, 1);
        injectElement(nonCombining, 3);
        Assert.assertEquals(new Instant(1L), nonCombining.getWatermarkHold());
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(nonCombining, 2);
        List<WindowedValue<?>> extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2, 3}), (Matcher<? super Instant>) Matchers.equalTo(new Instant(1L)), (Matcher<? super BoundedWindow>) Matchers.equalTo(intervalWindow))));
        MatcherAssert.assertThat(extractOutput.get(0).getPane(), Matchers.equalTo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L)));
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        Assert.assertEquals(0L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
        nonCombining.advanceInputWatermark(new Instant(4L));
        Assert.assertEquals(new Instant(4L), nonCombining.getOutputWatermark());
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(false);
        nonCombining.advanceInputWatermark(new Instant(4L));
        injectElement(nonCombining, 2);
        injectElement(nonCombining, 3);
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.equalTo(intervalWindow.maxTimestamp().plus(millis)));
        injectElement(nonCombining, 5);
        Assert.assertEquals(new Instant(5L), nonCombining.getWatermarkHold());
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(nonCombining, 4);
        List<WindowedValue<?>> extractOutput2 = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput2, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2, 3, 2, 3, 4, 5}), 4L, 0L, 10L)));
        MatcherAssert.assertThat(extractOutput2.get(0).getPane(), Matchers.equalTo(PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1L, -1L)));
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(false);
        nonCombining.advanceInputWatermark(new Instant(8L));
        injectElement(nonCombining, 6);
        injectElement(nonCombining, 5);
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.equalTo(intervalWindow.maxTimestamp().plus(millis)));
        injectElement(nonCombining, 4);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        nonCombining.setAutoAdvanceOutputWatermark(false);
        nonCombining.advanceInputWatermark(intervalWindow.maxTimestamp().plus(Duration.millis(1L)));
        nonCombining.fireTimer(intervalWindow, intervalWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
        List<WindowedValue<?>> extractOutput3 = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput3, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2, 3, 2, 3, 4, 5, 4, 5, 6}), 9L, 0L, 10L)));
        MatcherAssert.assertThat(extractOutput3.get(0).getPane(), Matchers.equalTo(PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2L, 0L)));
        nonCombining.setAutoAdvanceOutputWatermark(true);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(false);
        injectElement(nonCombining, 8);
        Assert.assertEquals(0L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
        nonCombining.advanceInputWatermark(new Instant(50L));
        List<WindowedValue<?>> extractOutput4 = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput4, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2, 3, 2, 3, 4, 5, 4, 5, 6, 8}), 9L, 0L, 10L)));
        MatcherAssert.assertThat(extractOutput4.get(0).getPane(), Matchers.equalTo(PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 3L, 1L)));
        Assert.assertEquals(new Instant(50L), nonCombining.getOutputWatermark());
        Assert.assertEquals((Object) null, nonCombining.getWatermarkHold());
        nonCombining.fireTimer(new IntervalWindow(new Instant(0L), new Instant(10L)), new Instant(12L), TimeDomain.EVENT_TIME);
        Assert.assertFalse(nonCombining.isMarkedFinished(this.firstWindow));
        nonCombining.assertHasOnlyGlobalAndFinishedSetsFor(new IntervalWindow[0]);
    }

    @Test
    public void testWatermarkHoldForLateNewWindow() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10L))).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1)))).withAllowedLateness(Duration.standardMinutes(1L)));
        nonCombining.setAutoAdvanceOutputWatermark(false);
        Assert.assertEquals((Object) null, nonCombining.getWatermarkHold());
        Assert.assertEquals((Object) null, nonCombining.getOutputWatermark());
        nonCombining.advanceInputWatermark(new Instant(40L));
        injectElements(nonCombining, 1);
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        injectElements(nonCombining, 10);
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
    }

    @Test
    public void testMergingWatermarkHoldLateNewWindowMerged() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10L))).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1)))).withAllowedLateness(Duration.standardMinutes(1L)));
        nonCombining.setAutoAdvanceOutputWatermark(false);
        Assert.assertEquals((Object) null, nonCombining.getWatermarkHold());
        Assert.assertEquals((Object) null, nonCombining.getOutputWatermark());
        nonCombining.advanceInputWatermark(new Instant(24L));
        injectElements(nonCombining, 1);
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        injectElements(nonCombining, 14);
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        injectElements(nonCombining, 6, 16);
        Assert.assertEquals(nonCombining.getWatermarkHold(), new Instant(25L));
        injectElements(nonCombining, 6, 21);
        Assert.assertEquals(nonCombining.getWatermarkHold(), new Instant(30L));
        nonCombining.advanceInputWatermark(new Instant(31L));
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        injectElements(nonCombining, 0);
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        injectElements(nonCombining, 32, 40);
        Assert.assertEquals(nonCombining.getWatermarkHold(), new Instant(49L));
        injectElements(nonCombining, 24);
        Assert.assertEquals(nonCombining.getWatermarkHold(), new Instant(49L));
        nonCombining.advanceInputWatermark(new Instant(50L));
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
    }

    @Test
    public void testMergingLateWatermarkHolds() throws Exception {
        MetricsEnvironment.setCurrentContainer(new MetricsContainerImpl("any"));
        Duration millis = Duration.millis(10L);
        Duration standardMinutes = Duration.standardMinutes(100L);
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(Sessions.withGapDuration(millis)).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(10)))).withAllowedLateness(standardMinutes));
        nonCombining.setAutoAdvanceOutputWatermark(false);
        Assert.assertEquals((Object) null, nonCombining.getWatermarkHold());
        Assert.assertEquals((Object) null, nonCombining.getOutputWatermark());
        nonCombining.advanceInputWatermark(new Instant(20L));
        injectElements(nonCombining, Arrays.asList(3));
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        injectElements(nonCombining, Arrays.asList(4));
        Instant minus = new Instant(4L).plus(millis).plus(standardMinutes).minus(Duration.millis(1L));
        Assert.assertEquals(minus, nonCombining.getWatermarkHold());
        nonCombining.advanceInputWatermark(new Instant(1000L));
        Assert.assertEquals(minus, nonCombining.getWatermarkHold());
    }

    @Test
    public void testMergingWatermarkHoldAndLateDataFuzz() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        Duration standardMinutes = Duration.standardMinutes(100L);
        long nextLong = ThreadLocalRandom.current().nextLong();
        LOG.info("Random seed: {}", Long.valueOf(nextLong));
        Random random = new Random(nextLong);
        Duration millis = Duration.millis(10 + random.nextInt(40));
        LOG.info("Gap duration {}", millis);
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(Sessions.withGapDuration(millis)).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1)))).withAllowedLateness(standardMinutes));
        nonCombining.setAutoAdvanceOutputWatermark(true);
        Assert.assertEquals((Object) null, nonCombining.getWatermarkHold());
        Assert.assertEquals((Object) null, nonCombining.getOutputWatermark());
        ArrayList arrayList = new ArrayList();
        int nextInt = 3 + random.nextInt(100);
        int nextInt2 = 1 + random.nextInt(400);
        LOG.info("Num ts {}", Integer.valueOf(nextInt));
        LOG.info("Max ts {}", Integer.valueOf(nextInt2));
        for (int i = nextInt; i >= 0; i--) {
            arrayList.add(Integer.valueOf(random.nextInt(nextInt2)));
        }
        LOG.info("Times: {}", arrayList);
        int i2 = 0;
        long j = 0;
        while (i2 < arrayList.size()) {
            int nextInt3 = i2 + random.nextInt(arrayList.size());
            if (nextInt3 > arrayList.size()) {
                nextInt3 = arrayList.size();
            }
            LOG.info("nextSplit {}", Integer.valueOf(nextInt3));
            injectElements(nonCombining, arrayList.subList(i2, nextInt3));
            if (random.nextInt(3) == 0) {
                int nextInt4 = random.nextInt((int) (nextInt2 + millis.getMillis()));
                if (nextInt4 > j) {
                    Boolean valueOf = Boolean.valueOf(random.nextBoolean());
                    LOG.info("nextWatermark {} {}", Integer.valueOf(nextInt4), valueOf);
                    j = nextInt4;
                    nonCombining.setAutoAdvanceOutputWatermark(valueOf.booleanValue());
                    nonCombining.advanceInputWatermark(new Instant(j));
                }
            }
            i2 = nextInt3;
            Instant watermarkHold = nonCombining.getWatermarkHold();
            if (watermarkHold != null) {
                MatcherAssert.assertThat(watermarkHold, Matchers.greaterThanOrEqualTo(new Instant(j)));
                MatcherAssert.assertThat(Long.valueOf(j), Matchers.lessThan(Long.valueOf(nextInt2 + millis.getMillis())));
            }
        }
        nonCombining.setAutoAdvanceOutputWatermark(true);
        long millis2 = millis.getMillis() + nextInt2;
        nonCombining.advanceInputWatermark(new Instant(millis2));
        LOG.info("Output {}", nonCombining.extractOutput());
        if (nonCombining.getWatermarkHold() != null) {
            MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.equalTo(new Instant(millis2).plus(standardMinutes)));
        }
        Assert.assertEquals(0L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
    }

    @Test
    public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10L), Window.ClosingBehavior.FIRE_ALWAYS);
        nonCombining.setAutoAdvanceOutputWatermark(false);
        nonCombining.advanceInputWatermark(new Instant(15L));
        nonCombining.advanceOutputWatermark(new Instant(11L));
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(10L), new Instant(20L));
        injectElement(nonCombining, 14);
        Assert.assertEquals(new Instant(14L), nonCombining.getWatermarkHold());
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        nonCombining.advanceInputWatermark(new Instant(20L));
        nonCombining.fireTimer(intervalWindow, intervalWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(false);
        Assert.assertEquals(new Instant(29L), nonCombining.getWatermarkHold());
        Assert.assertEquals(new Instant(29L), nonCombining.getNextTimer(TimeDomain.EVENT_TIME));
        injectElement(nonCombining, 13);
        Assert.assertEquals(new Instant(29L), nonCombining.getWatermarkHold());
        Assert.assertEquals(new Instant(29L), nonCombining.getNextTimer(TimeDomain.EVENT_TIME));
        nonCombining.advanceInputWatermark(new Instant(30L));
        Assert.assertFalse(nonCombining.isMarkedFinished(new IntervalWindow(new Instant(10L), new Instant(20L))));
        nonCombining.assertHasOnlyGlobalAndFinishedSetsFor(new IntervalWindow[0]);
    }

    @Test
    public void testPaneInfoAllStates() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        nonCombining.advanceInputWatermark(new Instant(0L));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(nonCombining, 1);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY))));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(nonCombining, 2);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1L, -1L))));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(false);
        nonCombining.setAutoAdvanceOutputWatermark(false);
        nonCombining.advanceInputWatermark(new Instant(15L));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        injectElement(nonCombining, 3);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2L, 0L))));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        nonCombining.setAutoAdvanceOutputWatermark(true);
        injectElement(nonCombining, 4);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 3L, 1L))));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(nonCombining, 5);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 4L, 2L))));
    }

    @Test
    public void testPaneInfoAllStatesAfterWatermark() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTrigger(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)).withTimestampCombiner(TimestampCombiner.EARLIEST).withClosingBehavior(Window.ClosingBehavior.FIRE_ALWAYS));
        nonCombining.advanceInputWatermark(new Instant(0L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)));
        List extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput, Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L))));
        MatcherAssert.assertThat(extractOutput, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 1L, 0L, 10L)));
        nonCombining.advanceInputWatermark(new Instant(50L));
        List extractOutput2 = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput2, Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 1L, 0L))));
        MatcherAssert.assertThat(extractOutput2, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.emptyIterable(), 9L, 0L, 10L)));
        nonCombining.advanceInputWatermark(new Instant(150L));
        List extractOutput3 = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput3, Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 2L, 1L))));
        MatcherAssert.assertThat(extractOutput3, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.emptyIterable(), 9L, 0L, 10L)));
    }

    @Test
    public void noEmptyPanesFinalIfNonEmpty() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTrigger(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)).withTimestampCombiner(TimestampCombiner.EARLIEST).withClosingBehavior(Window.ClosingBehavior.FIRE_IF_NON_EMPTY));
        nonCombining.advanceInputWatermark(new Instant(0L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)));
        nonCombining.advanceInputWatermark(new Instant(20L));
        nonCombining.advanceInputWatermark(new Instant(250L));
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(new Matcher[]{WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 1L, 0L, 10L), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 9L, 0L, 10L)}));
    }

    @Test
    public void noEmptyPanesFinalAlways() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTrigger(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)).withTimestampCombiner(TimestampCombiner.EARLIEST).withClosingBehavior(Window.ClosingBehavior.FIRE_ALWAYS));
        nonCombining.advanceInputWatermark(new Instant(0L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)));
        nonCombining.advanceInputWatermark(new Instant(20L));
        nonCombining.advanceInputWatermark(new Instant(250L));
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(new Matcher[]{WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 1L, 0L, 10L), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 9L, 0L, 10L), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 9L, 0L, 10L)}));
    }

    @Test
    public void testNoWatermarkTriggerNoHold() throws Exception {
        Duration standardDays = Duration.standardDays(1L);
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTrigger(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5L)))).withAllowedLateness(standardDays));
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(0L), new Instant(10L));
        nonCombining.advanceInputWatermark(new Instant(0L));
        nonCombining.advanceProcessingTime(new Instant(0L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)));
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.equalTo(intervalWindow.maxTimestamp()));
        nonCombining.advanceProcessingTime(new Instant(6000L));
        MatcherAssert.assertThat(Integer.valueOf(nonCombining.getOutputSize()), Matchers.equalTo(1));
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.nullValue());
        nonCombining.advanceInputWatermark(intervalWindow.maxTimestamp().plus(Duration.standardHours(1L)));
        nonCombining.injectElements(TimestampedValue.of(3, new Instant(3L)));
        MatcherAssert.assertThat(nonCombining.getWatermarkHold(), Matchers.equalTo(intervalWindow.maxTimestamp().plus(standardDays)));
    }

    @Test
    public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTrigger(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)).withTimestampCombiner(TimestampCombiner.EARLIEST).withClosingBehavior(Window.ClosingBehavior.FIRE_ALWAYS));
        nonCombining.advanceInputWatermark(new Instant(0L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)));
        List extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput, Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L))));
        MatcherAssert.assertThat(extractOutput, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 1L, 0L, 10L)));
        nonCombining.advanceInputWatermark(new Instant(50L));
        List extractOutput2 = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput2, Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 1L, 0L))));
        MatcherAssert.assertThat(extractOutput2, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 9L, 0L, 10L)));
        nonCombining.advanceInputWatermark(new Instant(150L));
        List extractOutput3 = nonCombining.extractOutput();
        MatcherAssert.assertThat(extractOutput3, Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 2L, 1L))));
        MatcherAssert.assertThat(extractOutput3, Matchers.contains(WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 9L, 0L, 10L)));
    }

    @Test
    public void testPaneInfoFinalAndOnTime() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTrigger(Repeatedly.forever(AfterPane.elementCountAtLeast(2)).orFinally(AfterWatermark.pastEndOfWindow())).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)).withClosingBehavior(Window.ClosingBehavior.FIRE_ALWAYS));
        nonCombining.advanceInputWatermark(new Instant(0L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)));
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L))));
        nonCombining.advanceInputWatermark(new Instant(150L));
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1L, 0L))));
    }

    @Test
    public void testPaneInfoSkipToFinish() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        nonCombining.advanceInputWatermark(new Instant(0L));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(nonCombining, 1);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY))));
    }

    @Test
    public void testPaneInfoSkipToNonSpeculativeAndFinish() throws Exception {
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        nonCombining.advanceInputWatermark(new Instant(15L));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(nonCombining, 1);
        MatcherAssert.assertThat(nonCombining.extractOutput(), Matchers.contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, PaneInfo.Timing.LATE))));
    }

    @Test
    public void testMergeBeforeFinalizing() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.ZERO, Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(10, new Instant(10L)));
        nonCombining.advanceInputWatermark(new Instant(100L));
        List extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(Integer.valueOf(extractOutput.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 10}), 1L, 1L, 20L));
        MatcherAssert.assertThat(((WindowedValue) extractOutput.get(0)).getPane(), Matchers.equalTo(PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0L, 0L)));
    }

    @Test
    public void testMergingWithCloseBeforeGC() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(10, new Instant(10L)));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        nonCombining.advanceInputWatermark(new Instant(30L));
        nonCombining.advanceInputWatermark(new Instant(100L));
        List extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(Integer.valueOf(extractOutput.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 10}), 1L, 1L, 20L));
        MatcherAssert.assertThat(((WindowedValue) extractOutput.get(0)).getPane(), Matchers.equalTo(PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0L, 0L)));
    }

    @Test
    public void testMergingWithCloseTrigger() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(1L), new Instant(12L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        nonCombining.advanceInputWatermark(new Instant(13L));
        nonCombining.fireTimer(intervalWindow, intervalWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
        Assert.assertTrue(nonCombining.isMarkedFinished(intervalWindow));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(false);
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)));
        Assert.assertTrue(nonCombining.isMarkedFinished(intervalWindow));
    }

    @Test
    public void testMergingWithReusedWindow() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        IntervalWindow intervalWindow = new IntervalWindow(new Instant(1L), new Instant(11L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        nonCombining.advanceInputWatermark(new Instant(15L));
        nonCombining.fireTimer(intervalWindow, intervalWindow.maxTimestamp(), TimeDomain.EVENT_TIME);
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)));
        Assert.assertTrue(nonCombining.hasNoActiveWindows());
        nonCombining.advanceInputWatermark(new Instant(100L));
        List extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(Integer.valueOf(extractOutput.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1}), (Matcher<? super Instant>) Matchers.equalTo(new Instant(1L)), (Matcher<? super BoundedWindow>) Matchers.equalTo(intervalWindow)));
        MatcherAssert.assertThat(((WindowedValue) extractOutput.get(0)).getPane(), Matchers.equalTo(PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0L, 0L)));
    }

    @Test
    public void testMergingWithClosedRepresentative() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(8, new Instant(8L)));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)), TimestampedValue.of(8, new Instant(8L)));
        nonCombining.advanceInputWatermark(new Instant(100L));
        List extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(Integer.valueOf(extractOutput.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 8}), 1L, 1L, 18L));
        MatcherAssert.assertThat(((WindowedValue) extractOutput.get(0)).getPane(), Matchers.equalTo(PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY, 0L, 0L)));
    }

    @Test
    public void testMergingWithClosedDoesNotPoison() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        nonCombining.injectElements(TimestampedValue.of(2, new Instant(2L)));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(false);
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(2L)), TimestampedValue.of(3, new Instant(3L)));
        nonCombining.advanceInputWatermark(new Instant(100L));
        List extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(Integer.valueOf(extractOutput.size()), Matchers.equalTo(2));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{2}), 2L, 2L, 12L));
        MatcherAssert.assertThat(((WindowedValue) extractOutput.get(0)).getPane(), Matchers.equalTo(PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY, 0L, 0L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2, 3}), 1L, 1L, 13L));
        MatcherAssert.assertThat(((WindowedValue) extractOutput.get(1)).getPane(), Matchers.equalTo(PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0L, 0L)));
    }

    @Test
    public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        ReduceFnTester combining = ReduceFnTester.combining(WindowingStrategy.of(SlidingWindows.of(Duration.millis(100L)).every(Duration.millis(30L))).withTrigger(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.millis(1000L)), Sum.ofIntegers(), VarIntCoder.of());
        combining.injectElements(TimestampedValue.of(10, new Instant(23L)), TimestampedValue.of(12, new Instant(40L)));
        Assert.assertEquals(0L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
        combining.advanceInputWatermark(new Instant(70L));
        combining.injectElements(TimestampedValue.of(14, new Instant(60L)));
        Assert.assertEquals(1L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
        combining.advanceInputWatermark(new Instant(130L));
        combining.injectElements(TimestampedValue.of(16, new Instant(40L)));
        Assert.assertEquals(4L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
    }

    @Test
    public void testIdempotentEmptyPanesDiscarding() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        injectElement(nonCombining, 1);
        injectElement(nonCombining, 2);
        nonCombining.advanceInputWatermark(new Instant(12L));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        nonCombining.fireTimer(this.firstWindow, new Instant(9L), TimeDomain.EVENT_TIME);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        nonCombining.fireTimer(this.firstWindow, new Instant(9L), TimeDomain.EVENT_TIME);
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(nonCombining, 3);
        List<WindowedValue<?>> extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(Integer.valueOf(extractOutput.size()), Matchers.equalTo(2));
        MatcherAssert.assertThat(extractOutput.get(0), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 1L, 0L, 10L));
        MatcherAssert.assertThat((Iterable) extractOutput.get(1).getValue(), Matchers.contains(new Integer[]{3}));
        MatcherAssert.assertThat(extractOutput.get(1).getPane(), Matchers.equalTo(PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 1L, 1L)));
        Assert.assertTrue(nonCombining.isMarkedFinished(this.firstWindow));
        nonCombining.assertHasOnlyGlobalAndFinishedSetsFor(this.firstWindow);
        Assert.assertEquals(0L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
    }

    @Test
    public void testIdempotentEmptyPanesAccumulating() throws Exception {
        MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("any");
        MetricsEnvironment.setCurrentContainer(metricsContainerImpl);
        ReduceFnTester<Integer, ?, IntervalWindow> nonCombining = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10L)), this.mockTriggerStateMachine, WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100L), Window.ClosingBehavior.FIRE_IF_NON_EMPTY);
        injectElement(nonCombining, 1);
        injectElement(nonCombining, 2);
        nonCombining.advanceInputWatermark(new Instant(12L));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        nonCombining.fireTimer(this.firstWindow, new Instant(9L), TimeDomain.EVENT_TIME);
        List<WindowedValue<?>> extractOutput = nonCombining.extractOutput();
        MatcherAssert.assertThat(Integer.valueOf(extractOutput.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat(extractOutput.get(0), WindowMatchers.isSingleWindowedValue(Matchers.containsInAnyOrder(new Integer[]{1, 2}), 1L, 0L, 10L));
        MatcherAssert.assertThat(extractOutput.get(0).getPane(), Matchers.equalTo(PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0L, 0L)));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        nonCombining.fireTimer(this.firstWindow, new Instant(9L), TimeDomain.EVENT_TIME);
        MatcherAssert.assertThat(Integer.valueOf(nonCombining.extractOutput().size()), Matchers.equalTo(0));
        Mockito.when(Boolean.valueOf(this.mockTriggerStateMachine.shouldFire(anyTriggerContext()))).thenReturn(true);
        triggerShouldFinish(this.mockTriggerStateMachine);
        injectElement(nonCombining, 3);
        List<WindowedValue<?>> extractOutput2 = nonCombining.extractOutput();
        MatcherAssert.assertThat(Integer.valueOf(extractOutput2.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat((Iterable) extractOutput2.get(0).getValue(), Matchers.containsInAnyOrder(new Integer[]{1, 2, 3}));
        MatcherAssert.assertThat(extractOutput2.get(0).getPane(), Matchers.equalTo(PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 1L, 1L)));
        Assert.assertTrue(nonCombining.isMarkedFinished(this.firstWindow));
        nonCombining.assertHasOnlyGlobalAndFinishedSetsFor(this.firstWindow);
        Assert.assertEquals(0L, metricsContainerImpl.getCounter(MetricName.named((Class<?>) ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)).getCumulative().longValue());
    }

    @Test
    public void testEmptyOnTimeFromOrFinally() throws Exception {
        ReduceFnTester combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withTrigger(AfterEach.inOrder(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(5L))).orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(25L))))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceInputWatermark(new Instant(0L));
        combining.advanceProcessingTime(new Instant(0L));
        combining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(1, new Instant(3L)), TimestampedValue.of(1, new Instant(7L)), TimestampedValue.of(1, new Instant(5L)));
        combining.advanceProcessingTime(new Instant(6L));
        combining.advanceInputWatermark(new Instant(11L));
        List extractOutput = combining.extractOutput();
        Assert.assertEquals(2L, extractOutput.size());
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(4, 1L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.isSingleWindowedValue(4, 9L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 1L, 0L)));
    }

    @Test
    public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception {
        ReduceFnTester combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withTrigger(AfterEach.inOrder(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(5L))).orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(25L))))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)).withClosingBehavior(Window.ClosingBehavior.FIRE_ALWAYS).withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceInputWatermark(new Instant(0L));
        combining.advanceProcessingTime(new Instant(0L));
        combining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(1, new Instant(3L)), TimestampedValue.of(1, new Instant(7L)), TimestampedValue.of(1, new Instant(5L)));
        combining.advanceProcessingTime(new Instant(6L));
        combining.advanceInputWatermark(new Instant(11L));
        combining.advanceInputWatermark(new Instant(110L));
        List extractOutput = combining.extractOutput();
        Assert.assertEquals(2L, extractOutput.size());
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(4, 1L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.isSingleWindowedValue(4, 9L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 1L, 0L)));
    }

    @Test
    public void testEmptyOnTimeWithOnTimeBehaviorBackwardCompatibility() throws Exception {
        ReduceFnTester combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withTrigger(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.ZERO).withClosingBehavior(Window.ClosingBehavior.FIRE_IF_NON_EMPTY), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceInputWatermark(new Instant(0L));
        combining.advanceProcessingTime(new Instant(0L));
        combining.injectElements(TimestampedValue.of(1, new Instant(1L)));
        combining.advanceInputWatermark(new Instant(11L));
        List extractOutput = combining.extractOutput();
        Assert.assertEquals(2L, extractOutput.size());
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1L, 0L)));
    }

    @Test
    public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmptyAndLateData() throws Exception {
        ReduceFnTester combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withTrigger(AfterEach.inOrder(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(5L))).orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(25L))))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)).withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceInputWatermark(new Instant(0L));
        combining.advanceProcessingTime(new Instant(0L));
        combining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(1, new Instant(3L)), TimestampedValue.of(1, new Instant(7L)), TimestampedValue.of(1, new Instant(5L)));
        combining.advanceProcessingTime(new Instant(6L));
        combining.advanceInputWatermark(new Instant(11L));
        combining.injectElements(TimestampedValue.of(1, new Instant(9L)));
        combining.advanceProcessingTime(new Instant(32L));
        List extractOutput = combining.extractOutput();
        Assert.assertEquals(2L, extractOutput.size());
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(4, 1L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.isSingleWindowedValue(5, 9L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1L, 0L)));
    }

    @Test
    public void testProcessingTime() throws Exception {
        ReduceFnTester combining = ReduceFnTester.combining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTimestampCombiner(TimestampCombiner.EARLIEST).withTrigger(AfterEach.inOrder(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(5L))).orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(25L))))).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)), Sum.ofIntegers(), VarIntCoder.of());
        combining.advanceInputWatermark(new Instant(0L));
        combining.advanceProcessingTime(new Instant(0L));
        combining.injectElements(TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(1, new Instant(3L)), TimestampedValue.of(1, new Instant(7L)), TimestampedValue.of(1, new Instant(5L)));
        combining.advanceProcessingTime(new Instant(6L));
        combining.injectElements(TimestampedValue.of(1, new Instant(8L)), TimestampedValue.of(1, new Instant(4L)));
        combining.advanceInputWatermark(new Instant(11L));
        combining.injectElements(TimestampedValue.of(1, new Instant(8L)), TimestampedValue.of(1, new Instant(4L)), TimestampedValue.of(1, new Instant(5L)));
        combining.advanceInputWatermark(new Instant(12L));
        combining.injectElements(TimestampedValue.of(1, new Instant(3L)));
        combining.advanceProcessingTime(new Instant(15L));
        combining.injectElements(TimestampedValue.of(1, new Instant(5L)));
        combining.advanceProcessingTime(new Instant(32L));
        combining.injectElements(TimestampedValue.of(1, new Instant(3L)));
        combining.advanceInputWatermark(new Instant(125L));
        List extractOutput = combining.extractOutput();
        Assert.assertEquals(4L, extractOutput.size());
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.isSingleWindowedValue(4, 1L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.isSingleWindowedValue(6, 4L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(2), WindowMatchers.isSingleWindowedValue(11, 9L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(3), WindowMatchers.isSingleWindowedValue(12, 9L, 0L, 10L));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(0), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY, 0L, -1L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(1), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 1L, 0L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(2), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2L, 1L)));
        MatcherAssert.assertThat((WindowedValue) extractOutput.get(3), WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 3L, 2L)));
    }

    @Test
    public void fireNonEmptyOnDrainInGlobalWindow() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(new GlobalWindows()).withTrigger(Repeatedly.forever(AfterPane.elementCountAtLeast(3))).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES));
        nonCombining.advanceInputWatermark(new Instant(0L));
        for (int i = 0; i < 20; i++) {
            nonCombining.injectElements(TimestampedValue.of(Integer.valueOf(i), new Instant(i)));
        }
        List extractOutput = nonCombining.extractOutput();
        Assert.assertEquals(6L, extractOutput.size());
        for (int i2 = 0; i2 < extractOutput.size(); i2++) {
            Assert.assertEquals(PaneInfo.Timing.EARLY, ((WindowedValue) extractOutput.get(i2)).getPane().getTiming());
            Assert.assertEquals(i2, ((WindowedValue) extractOutput.get(i2)).getPane().getIndex());
            Assert.assertEquals(3L, Iterables.size((Iterable) ((WindowedValue) extractOutput.get(i2)).getValue()));
        }
        nonCombining.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        List extractOutput2 = nonCombining.extractOutput();
        Assert.assertEquals(1L, extractOutput2.size());
        Assert.assertEquals(PaneInfo.Timing.ON_TIME, ((WindowedValue) extractOutput2.get(0)).getPane().getTiming());
        Assert.assertEquals(6L, ((WindowedValue) extractOutput2.get(0)).getPane().getIndex());
        Assert.assertEquals(2L, Iterables.size((Iterable) ((WindowedValue) extractOutput2.get(0)).getValue()));
    }

    @Test
    public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(new GlobalWindows()).withTrigger(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(3L)))).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES));
        for (int i = 0; i < 20; i++) {
            nonCombining.advanceProcessingTime(new Instant(i));
            nonCombining.injectElements(TimestampedValue.of(Integer.valueOf(i), new Instant(i)));
        }
        nonCombining.advanceProcessingTime(new Instant(24L));
        List extractOutput = nonCombining.extractOutput();
        Assert.assertEquals(5L, extractOutput.size());
        for (int i2 = 0; i2 < extractOutput.size(); i2++) {
            Assert.assertEquals(PaneInfo.Timing.EARLY, ((WindowedValue) extractOutput.get(i2)).getPane().getTiming());
            Assert.assertEquals(i2, ((WindowedValue) extractOutput.get(i2)).getPane().getIndex());
            Assert.assertEquals(4L, Iterables.size((Iterable) ((WindowedValue) extractOutput.get(i2)).getValue()));
        }
        nonCombining.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
        List extractOutput2 = nonCombining.extractOutput();
        Assert.assertEquals(1L, extractOutput2.size());
        Assert.assertEquals(PaneInfo.Timing.ON_TIME, ((WindowedValue) extractOutput2.get(0)).getPane().getTiming());
        Assert.assertEquals(5L, ((WindowedValue) extractOutput2.get(0)).getPane().getIndex());
        Assert.assertEquals(0L, Iterables.size((Iterable) ((WindowedValue) extractOutput2.get(0)).getValue()));
    }

    @Test
    public void setGarbageCollectionHoldOnLateElements() throws Exception {
        ReduceFnTester nonCombining = ReduceFnTester.nonCombining(WindowingStrategy.of(FixedWindows.of(Duration.millis(10L))).withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(2))).withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES).withAllowedLateness(Duration.millis(100L)).withClosingBehavior(Window.ClosingBehavior.FIRE_IF_NON_EMPTY));
        nonCombining.advanceInputWatermark(new Instant(0L));
        nonCombining.advanceOutputWatermark(new Instant(0L));
        nonCombining.injectElements(TimestampedValue.of(1, new Instant(1L)));
        nonCombining.advanceInputWatermark(new Instant(109L));
        nonCombining.advanceOutputWatermark(new Instant(109L));
        nonCombining.injectElements(TimestampedValue.of(2, new Instant(2L)));
        Assert.assertEquals(new Instant(109L), nonCombining.getWatermarkHold());
        nonCombining.advanceInputWatermark(new Instant(110L));
        nonCombining.advanceOutputWatermark(new Instant(110L));
        Assert.assertEquals(2L, nonCombining.extractOutput().size());
    }
}
