/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.testing;

import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class TestStreamTest
implements Serializable {
    @Rule
    public transient TestPipeline p = TestPipeline.create();
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testLateDataAccumulating() {
        Instant instant = new Instant(0L);
        TestStream source = TestStream.create((Coder)VarIntCoder.of()).addElements(TimestampedValue.of((Object)1, (Instant)instant), new TimestampedValue[]{TimestampedValue.of((Object)2, (Instant)instant), TimestampedValue.of((Object)3, (Instant)instant)}).advanceWatermarkTo(instant.plus((ReadableDuration)Duration.standardMinutes((long)6L))).addElements(TimestampedValue.of((Object)4, (Instant)instant), new TimestampedValue[]{TimestampedValue.of((Object)5, (Instant)instant)}).advanceWatermarkTo(instant.plus((ReadableDuration)Duration.standardMinutes((long)20L))).addElements(TimestampedValue.of((Object)-1, (Instant)instant), new TimestampedValue[]{TimestampedValue.of((Object)-2, (Instant)instant), TimestampedValue.of((Object)-3, (Instant)instant)}).advanceWatermarkToInfinity();
        PCollection windowed = (PCollection)((PCollection)this.p.apply((PTransform)source)).apply((PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardMinutes((long)5L))).triggering((Trigger)AfterWatermark.pastEndOfWindow().withEarlyFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes((long)2L))).withLateFirings((Trigger.OnceTrigger)AfterPane.elementCountAtLeast((int)1))).accumulatingFiredPanes().withAllowedLateness(Duration.standardMinutes((long)5L), Window.ClosingBehavior.FIRE_ALWAYS));
        PCollection triggered = (PCollection)((PCollection)((PCollection)((PCollection)windowed.apply((PTransform)WithKeys.of((Object)1))).apply((PTransform)GroupByKey.create())).apply((PTransform)Values.create())).apply((PTransform)Flatten.iterables());
        PCollection count = (PCollection)windowed.apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)Count.combineFn()).withoutDefaults());
        PCollection sum = (PCollection)windowed.apply((PTransform)Sum.integersGlobally().withoutDefaults());
        IntervalWindow window = new IntervalWindow(instant, instant.plus((ReadableDuration)Duration.standardMinutes((long)5L)));
        PAssert.that((PCollection)triggered).inFinalPane((BoundedWindow)window).containsInAnyOrder((Object[])new Integer[]{1, 2, 3, 4, 5});
        PAssert.that((PCollection)triggered).inOnTimePane((BoundedWindow)window).containsInAnyOrder((Object[])new Integer[]{1, 2, 3});
        PAssert.that((PCollection)count).inWindow((BoundedWindow)window).satisfies((SerializableFunction & Serializable)input -> {
            for (Long count1 : input) {
                Assert.assertThat((Object)count1, (Matcher)Matchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(3L)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(5L))));
            }
            return null;
        });
        PAssert.that((PCollection)sum).inWindow((BoundedWindow)window).satisfies((SerializableFunction & Serializable)input -> {
            for (Integer sum1 : input) {
                Assert.assertThat((Object)sum1, (Matcher)Matchers.allOf((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Integer.valueOf(6)), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Integer.valueOf(15))));
            }
            return null;
        });
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testProcessingTimeTrigger() {
        TestStream source = TestStream.create((Coder)VarLongCoder.of()).addElements(TimestampedValue.of((Object)1L, (Instant)new Instant(1000L)), new TimestampedValue[]{TimestampedValue.of((Object)2L, (Instant)new Instant(2000L))}).advanceProcessingTime(Duration.standardMinutes((long)12L)).addElements(TimestampedValue.of((Object)3L, (Instant)new Instant(3000L)), new TimestampedValue[0]).advanceProcessingTime(Duration.standardMinutes((long)6L)).advanceWatermarkToInfinity();
        PCollection sum = (PCollection)((PCollection)((PCollection)this.p.apply((PTransform)source)).apply((PTransform)Window.configure().triggering((Trigger)AfterWatermark.pastEndOfWindow().withEarlyFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes((long)5L)))).accumulatingFiredPanes().withAllowedLateness(Duration.ZERO))).apply((PTransform)Sum.longsGlobally());
        PAssert.that((PCollection)sum).inEarlyGlobalWindowPanes().containsInAnyOrder((Object[])new Long[]{3L, 6L});
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testDiscardingMode() {
        TestStream stream = TestStream.create((Coder)StringUtf8Coder.of()).advanceWatermarkTo(new Instant(0L)).addElements(TimestampedValue.of((Object)"firstPane", (Instant)new Instant(100L)), new TimestampedValue[]{TimestampedValue.of((Object)"alsoFirstPane", (Instant)new Instant(200L))}).addElements(TimestampedValue.of((Object)"onTimePane", (Instant)new Instant(500L)), new TimestampedValue[0]).advanceWatermarkTo(new Instant(1001L)).addElements(TimestampedValue.of((Object)"finalLatePane", (Instant)new Instant(750L)), new TimestampedValue[]{TimestampedValue.of((Object)"alsoFinalLatePane", (Instant)new Instant(250L))}).advanceWatermarkToInfinity();
        FixedWindows windowFn = FixedWindows.of((Duration)Duration.millis((long)1000L));
        Duration allowedLateness = Duration.millis((long)5000L);
        PCollection values = (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.p.apply((PTransform)stream)).apply((PTransform)Window.into((WindowFn)windowFn).triggering((Trigger)AfterWatermark.pastEndOfWindow().withEarlyFirings((Trigger.OnceTrigger)AfterPane.elementCountAtLeast((int)2)).withLateFirings((Trigger.OnceTrigger)Never.ever())).discardingFiredPanes().withAllowedLateness(allowedLateness))).apply((PTransform)WithKeys.of((Object)1))).apply((PTransform)GroupByKey.create())).apply((PTransform)Values.create())).apply((PTransform)Flatten.iterables());
        IntervalWindow window = windowFn.assignWindow(new Instant(100L));
        PAssert.that((PCollection)values).inWindow((BoundedWindow)window).containsInAnyOrder((Object[])new String[]{"firstPane", "alsoFirstPane", "onTimePane", "finalLatePane", "alsoFinalLatePane"});
        PAssert.that((PCollection)values).inCombinedNonLatePanes((BoundedWindow)window).containsInAnyOrder((Object[])new String[]{"firstPane", "alsoFirstPane", "onTimePane"});
        PAssert.that((PCollection)values).inOnTimePane((BoundedWindow)window).containsInAnyOrder((Object[])new String[]{"onTimePane"});
        PAssert.that((PCollection)values).inFinalPane((BoundedWindow)window).containsInAnyOrder((Object[])new String[]{"finalLatePane", "alsoFinalLatePane"});
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testFirstElementLate() {
        Instant lateElementTimestamp = new Instant(-1000000L);
        TestStream stream = TestStream.create((Coder)StringUtf8Coder.of()).advanceWatermarkTo(new Instant(0L)).addElements(TimestampedValue.of((Object)"late", (Instant)lateElementTimestamp), new TimestampedValue[0]).addElements(TimestampedValue.of((Object)"onTime", (Instant)new Instant(100L)), new TimestampedValue[0]).advanceWatermarkToInfinity();
        FixedWindows windowFn = FixedWindows.of((Duration)Duration.millis((long)1000L));
        Duration allowedLateness = Duration.millis((long)5000L);
        PCollection values = (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.p.apply((PTransform)stream)).apply((PTransform)Window.into((WindowFn)windowFn).triggering((Trigger)DefaultTrigger.of()).discardingFiredPanes().withAllowedLateness(allowedLateness))).apply((PTransform)WithKeys.of((Object)1))).apply((PTransform)GroupByKey.create())).apply((PTransform)Values.create())).apply((PTransform)Flatten.iterables());
        PAssert.that((PCollection)values).inWindow((BoundedWindow)windowFn.assignWindow(lateElementTimestamp)).empty();
        PAssert.that((PCollection)values).inWindow((BoundedWindow)windowFn.assignWindow(new Instant(100L))).containsInAnyOrder((Object[])new String[]{"onTime"});
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testElementsAtAlmostPositiveInfinity() {
        Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
        TestStream stream = TestStream.create((Coder)StringUtf8Coder.of()).addElements(TimestampedValue.of((Object)"foo", (Instant)endOfGlobalWindow), new TimestampedValue[]{TimestampedValue.of((Object)"bar", (Instant)endOfGlobalWindow)}).advanceWatermarkToInfinity();
        FixedWindows windows = FixedWindows.of((Duration)Duration.standardHours((long)6L));
        PCollection windowedValues = (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.p.apply((PTransform)stream)).apply((PTransform)Window.into((WindowFn)windows))).apply((PTransform)WithKeys.of((Object)1))).apply((PTransform)GroupByKey.create())).apply((PTransform)Values.create())).apply((PTransform)Flatten.iterables());
        PAssert.that((PCollection)windowedValues).inWindow((BoundedWindow)windows.assignWindow(endOfGlobalWindow)).containsInAnyOrder((Object[])new String[]{"foo", "bar"});
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class, UsesTestStream.class})
    public void testMultipleStreams() {
        TestStream stream = TestStream.create((Coder)StringUtf8Coder.of()).addElements((Object)"foo", (Object[])new String[]{"bar"}).advanceWatermarkToInfinity();
        TestStream other = TestStream.create((Coder)VarIntCoder.of()).addElements((Object)1, (Object[])new Integer[]{2, 3, 4}).advanceWatermarkToInfinity();
        PCollection createStrings = (PCollection)((PCollection)this.p.apply("CreateStrings", (PTransform)stream)).apply("WindowStrings", (PTransform)Window.configure().triggering((Trigger)AfterPane.elementCountAtLeast((int)2)).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());
        PAssert.that((PCollection)createStrings).containsInAnyOrder((Object[])new String[]{"foo", "bar"});
        PCollection createInts = (PCollection)((PCollection)this.p.apply("CreateInts", (PTransform)other)).apply("WindowInts", (PTransform)Window.configure().triggering((Trigger)AfterPane.elementCountAtLeast((int)4)).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());
        PAssert.that((PCollection)createInts).containsInAnyOrder((Object[])new Integer[]{1, 2, 3, 4});
        this.p.run();
    }

    @Test
    public void testElementAtPositiveInfinityThrows() {
        TestStream.Builder stream = TestStream.create((Coder)VarIntCoder.of()).addElements(TimestampedValue.of((Object)-1, (Instant)BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)), new TimestampedValue[0]);
        this.thrown.expect(IllegalArgumentException.class);
        stream.addElements(TimestampedValue.of((Object)1, (Instant)BoundedWindow.TIMESTAMP_MAX_VALUE), new TimestampedValue[0]);
    }

    @Test
    public void testAdvanceWatermarkNonMonotonicThrows() {
        TestStream.Builder stream = TestStream.create((Coder)VarIntCoder.of()).advanceWatermarkTo(new Instant(0L));
        this.thrown.expect(IllegalArgumentException.class);
        stream.advanceWatermarkTo(new Instant(-1L));
    }

    @Test
    public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
        TestStream.Builder stream = TestStream.create((Coder)VarIntCoder.of()).advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
        this.thrown.expect(IllegalArgumentException.class);
        stream.advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }
}

