package org.apache.beam.sdk.testing;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/testing/TestStreamTest.class */
public class TestStreamTest implements Serializable {

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @Test
    @Category({NeedsRunner.class})
    public void testLateDataAccumulating() {
        Instant instant = new Instant(0L);
        TestStream advanceWatermarkToInfinity = TestStream.create(VarIntCoder.of()).addElements(TimestampedValue.of(1, instant), new TimestampedValue[]{TimestampedValue.of(2, instant), TimestampedValue.of(3, instant)}).advanceWatermarkTo(instant.plus(Duration.standardMinutes(6L))).addElements(TimestampedValue.of(4, instant), new TimestampedValue[]{TimestampedValue.of(5, instant)}).advanceWatermarkTo(instant.plus(Duration.standardMinutes(20L))).addElements(TimestampedValue.of(-1, instant), new TimestampedValue[]{TimestampedValue.of(-2, instant), TimestampedValue.of(-3, instant)}).advanceWatermarkToInfinity();
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(advanceWatermarkToInfinity).apply(Window.into(FixedWindows.of(Duration.standardMinutes(5L))).triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(2L))).withLateFirings(AfterPane.elementCountAtLeast(1))).accumulatingFiredPanes().withAllowedLateness(Duration.standardMinutes(5L), Window.ClosingBehavior.FIRE_ALWAYS));
        PCollection apply2 = apply.apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
        PCollection apply3 = apply.apply(Count.globally().withoutDefaults());
        PCollection apply4 = apply.apply(Sum.integersGlobally().withoutDefaults());
        IntervalWindow intervalWindow = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));
        PAssert.that(apply2).inFinalPane(intervalWindow).containsInAnyOrder(new Integer[]{1, 2, 3, 4, 5});
        PAssert.that(apply2).inOnTimePane(intervalWindow).containsInAnyOrder(new Integer[]{1, 2, 3});
        PAssert.that(apply3).inWindow(intervalWindow).satisfies(new SerializableFunction<Iterable<Long>, Void>() { // from class: org.apache.beam.sdk.testing.TestStreamTest.1
            public Void apply(Iterable<Long> iterable) {
                Iterator<Long> it = iterable.iterator();
                while (it.hasNext()) {
                    Assert.assertThat(it.next(), Matchers.allOf(Matchers.greaterThanOrEqualTo(3L), Matchers.lessThanOrEqualTo(5L)));
                }
                return null;
            }
        });
        PAssert.that(apply4).inWindow(intervalWindow).satisfies(new SerializableFunction<Iterable<Integer>, Void>() { // from class: org.apache.beam.sdk.testing.TestStreamTest.2
            public Void apply(Iterable<Integer> iterable) {
                Iterator<Integer> it = iterable.iterator();
                while (it.hasNext()) {
                    Assert.assertThat(it.next(), Matchers.allOf(Matchers.greaterThanOrEqualTo(6), Matchers.lessThanOrEqualTo(15)));
                }
                return null;
            }
        });
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testProcessingTimeTrigger() {
        TestStream advanceWatermarkToInfinity = TestStream.create(VarLongCoder.of()).addElements(TimestampedValue.of(1L, new Instant(1000L)), new TimestampedValue[]{TimestampedValue.of(2L, new Instant(2000L))}).advanceProcessingTime(Duration.standardMinutes(12L)).addElements(TimestampedValue.of(3L, new Instant(3000L)), new TimestampedValue[0]).advanceProcessingTime(Duration.standardMinutes(6L)).advanceWatermarkToInfinity();
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(advanceWatermarkToInfinity).apply(Window.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5L)))).accumulatingFiredPanes().withAllowedLateness(Duration.ZERO)).apply(Sum.longsGlobally())).inEarlyGlobalWindowPanes().containsInAnyOrder(new Long[]{3L, 6L});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testDiscardingMode() {
        TestStream advanceWatermarkToInfinity = TestStream.create(StringUtf8Coder.of()).advanceWatermarkTo(new Instant(0L)).addElements(TimestampedValue.of("firstPane", new Instant(100L)), new TimestampedValue[]{TimestampedValue.of("alsoFirstPane", new Instant(200L))}).addElements(TimestampedValue.of("onTimePane", new Instant(500L)), new TimestampedValue[0]).advanceWatermarkTo(new Instant(1001L)).addElements(TimestampedValue.of("finalLatePane", new Instant(750L)), new TimestampedValue[]{TimestampedValue.of("alsoFinalLatePane", new Instant(250L))}).advanceWatermarkToInfinity();
        TestPipeline create = TestPipeline.create();
        FixedWindows of = FixedWindows.of(Duration.millis(1000L));
        PCollection apply = create.apply(advanceWatermarkToInfinity).apply(Window.into(of).triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(2)).withLateFirings(Never.ever())).discardingFiredPanes().withAllowedLateness(Duration.millis(5000L))).apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
        IntervalWindow assignWindow = of.assignWindow(new Instant(100L));
        PAssert.that(apply).inWindow(assignWindow).containsInAnyOrder(new String[]{"firstPane", "alsoFirstPane", "onTimePane", "finalLatePane", "alsoFinalLatePane"});
        PAssert.that(apply).inCombinedNonLatePanes(assignWindow).containsInAnyOrder(new String[]{"firstPane", "alsoFirstPane", "onTimePane"});
        PAssert.that(apply).inOnTimePane(assignWindow).containsInAnyOrder(new String[]{"onTimePane"});
        PAssert.that(apply).inFinalPane(assignWindow).containsInAnyOrder(new String[]{"finalLatePane", "alsoFinalLatePane"});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testFirstElementLate() {
        Instant instant = new Instant(-1000000L);
        TestStream advanceWatermarkToInfinity = TestStream.create(StringUtf8Coder.of()).advanceWatermarkTo(new Instant(0L)).addElements(TimestampedValue.of("late", instant), new TimestampedValue[0]).addElements(TimestampedValue.of("onTime", new Instant(100L)), new TimestampedValue[0]).advanceWatermarkToInfinity();
        TestPipeline create = TestPipeline.create();
        FixedWindows of = FixedWindows.of(Duration.millis(1000L));
        PCollection apply = create.apply(advanceWatermarkToInfinity).apply(Window.into(of).triggering(DefaultTrigger.of()).discardingFiredPanes().withAllowedLateness(Duration.millis(5000L))).apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables());
        PAssert.that(apply).inWindow(of.assignWindow(instant)).empty();
        PAssert.that(apply).inWindow(of.assignWindow(new Instant(100L))).containsInAnyOrder(new String[]{"onTime"});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testElementsAtAlmostPositiveInfinity() {
        Instant maxTimestamp = GlobalWindow.INSTANCE.maxTimestamp();
        TestStream advanceWatermarkToInfinity = TestStream.create(StringUtf8Coder.of()).addElements(TimestampedValue.of("foo", maxTimestamp), new TimestampedValue[]{TimestampedValue.of("bar", maxTimestamp)}).advanceWatermarkToInfinity();
        TestPipeline create = TestPipeline.create();
        FixedWindows of = FixedWindows.of(Duration.standardHours(6L));
        PAssert.that(create.apply(advanceWatermarkToInfinity).apply(Window.into(of)).apply(WithKeys.of(1)).apply(GroupByKey.create()).apply(Values.create()).apply(Flatten.iterables())).inWindow(of.assignWindow(GlobalWindow.INSTANCE.maxTimestamp())).containsInAnyOrder(new String[]{"foo", "bar"});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testMultipleStreams() {
        TestStream advanceWatermarkToInfinity = TestStream.create(StringUtf8Coder.of()).addElements("foo", new String[]{"bar"}).advanceWatermarkToInfinity();
        TestStream advanceWatermarkToInfinity2 = TestStream.create(VarIntCoder.of()).addElements(1, new Integer[]{2, 3, 4}).advanceWatermarkToInfinity();
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply("CreateStrings", advanceWatermarkToInfinity).apply("WindowStrings", Window.triggering(AfterPane.elementCountAtLeast(2)).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes())).containsInAnyOrder(new String[]{"foo", "bar"});
        PAssert.that(create.apply("CreateInts", advanceWatermarkToInfinity2).apply("WindowInts", Window.triggering(AfterPane.elementCountAtLeast(4)).withAllowedLateness(Duration.ZERO).accumulatingFiredPanes())).containsInAnyOrder(new Integer[]{1, 2, 3, 4});
        create.run();
    }

    @Test
    public void testElementAtPositiveInfinityThrows() {
        TestStream.Builder addElements = TestStream.create(VarIntCoder.of()).addElements(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)), new TimestampedValue[0]);
        this.thrown.expect(IllegalArgumentException.class);
        addElements.addElements(TimestampedValue.of(1, BoundedWindow.TIMESTAMP_MAX_VALUE), new TimestampedValue[0]);
    }

    @Test
    public void testAdvanceWatermarkNonMonotonicThrows() {
        TestStream.Builder advanceWatermarkTo = TestStream.create(VarIntCoder.of()).advanceWatermarkTo(new Instant(0L));
        this.thrown.expect(IllegalArgumentException.class);
        advanceWatermarkTo.advanceWatermarkTo(new Instant(-1L));
    }

    @Test
    public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
        TestStream.Builder advanceWatermarkTo = TestStream.create(VarIntCoder.of()).advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
        this.thrown.expect(IllegalArgumentException.class);
        advanceWatermarkTo.advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    @Test
    public void testUnsupportedRunnerThrows() {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        Pipeline create2 = Pipeline.create(create);
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("does not provide a required override");
        this.thrown.expectMessage(TestStream.class.getSimpleName());
        this.thrown.expectMessage(CrashingRunner.class.getSimpleName());
        create2.apply(TestStream.create(VarIntCoder.of()).advanceWatermarkToInfinity());
    }

    @Test
    public void testEncodeDecode() throws Exception {
        TestStream.Event add = TestStream.ElementEvent.add(TimestampedValue.of(1, new Instant()), new TimestampedValue[]{TimestampedValue.of(-10, new Instant()), TimestampedValue.of(Integer.MAX_VALUE, new Instant())});
        TestStream.Event advanceTo = TestStream.WatermarkEvent.advanceTo(new Instant(100L));
        TestStream.Event advanceBy = TestStream.ProcessingTimeEvent.advanceBy(Duration.millis(90548L));
        TestStream.EventCoder of = TestStream.EventCoder.of(VarIntCoder.of());
        CoderProperties.coderSerializable(of);
        CoderProperties.coderDecodeEncodeEqual(of, add);
        CoderProperties.coderDecodeEncodeEqual(of, advanceTo);
        CoderProperties.coderDecodeEncodeEqual(of, advanceBy);
    }
}
