package org.apache.beam.sdk.transforms.windowing;

import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/windowing/WindowTest.class */
public class WindowTest implements Serializable {

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @Test
    public void testWindowIntoSetWindowfn() {
        WindowingStrategy windowingStrategy = TestPipeline.create().apply(Create.of(new String[]{"hello", "world"}).withCoder(StringUtf8Coder.of())).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L)))).getWindowingStrategy();
        Assert.assertTrue(windowingStrategy.getWindowFn() instanceof FixedWindows);
        Assert.assertTrue(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
        Assert.assertEquals(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES, windowingStrategy.getMode());
    }

    @Test
    public void testWindowIntoTriggersAndAccumulating() {
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(10L));
        Repeatedly forever = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
        WindowingStrategy windowingStrategy = TestPipeline.create().apply(Create.of(new String[]{"hello", "world"}).withCoder(StringUtf8Coder.of())).apply(Window.into(of).triggering(forever).accumulatingFiredPanes().withAllowedLateness(Duration.ZERO)).getWindowingStrategy();
        Assert.assertEquals(of, windowingStrategy.getWindowFn());
        Assert.assertEquals(forever, windowingStrategy.getTrigger().getSpec());
        Assert.assertEquals(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, windowingStrategy.getMode());
    }

    @Test
    public void testWindowPropagatesEachPart() {
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(10L));
        Repeatedly forever = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
        WindowingStrategy windowingStrategy = TestPipeline.create().apply(Create.of(new String[]{"hello", "world"}).withCoder(StringUtf8Coder.of())).apply("Mode", Window.accumulatingFiredPanes()).apply("Lateness", Window.withAllowedLateness(Duration.standardDays(1L))).apply("Trigger", Window.triggering(forever)).apply("Window", Window.into(of)).getWindowingStrategy();
        Assert.assertEquals(of, windowingStrategy.getWindowFn());
        Assert.assertEquals(forever, windowingStrategy.getTrigger().getSpec());
        Assert.assertEquals(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES, windowingStrategy.getMode());
        Assert.assertEquals(Duration.standardDays(1L), windowingStrategy.getAllowedLateness());
    }

    @Test
    public void testWindowIntoPropagatesLateness() {
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(10L));
        FixedWindows of2 = FixedWindows.of(Duration.standardMinutes(25L));
        WindowingStrategy windowingStrategy = TestPipeline.create().apply(Create.of(new String[]{"hello", "world"}).withCoder(StringUtf8Coder.of())).apply("WindowInto10", Window.into(of).withAllowedLateness(Duration.standardDays(1L)).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5))).accumulatingFiredPanes()).apply("WindowInto25", Window.into(of2)).getWindowingStrategy();
        Assert.assertEquals(Duration.standardDays(1L), windowingStrategy.getAllowedLateness());
        Assert.assertEquals(of2, windowingStrategy.getWindowFn());
    }

    @Test
    public void testWindowGetName() {
        Assert.assertEquals("Window.Into()", Window.into(FixedWindows.of(Duration.standardMinutes(10L))).getName());
    }

    @Test
    public void testNonDeterministicWindowCoder() throws Coder.NonDeterministicException {
        FixedWindows fixedWindows = (FixedWindows) Mockito.mock(FixedWindows.class);
        Coder coder = (Coder) Mockito.mock(Coder.class);
        Mockito.when(fixedWindows.windowCoder()).thenReturn(coder);
        Coder.NonDeterministicException nonDeterministicException = new Coder.NonDeterministicException(coder, "Its just not deterministic.");
        ((Coder) Mockito.doThrow(nonDeterministicException).when(coder)).verifyDeterministic();
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectCause(Matchers.sameInstance(nonDeterministicException));
        this.thrown.expectMessage("Window coders must be deterministic");
        Window.into(fixedWindows);
    }

    @Test
    public void testMissingMode() {
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(10L));
        Repeatedly forever = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("requires that the accumulation mode");
        TestPipeline.create().apply(Create.of(new String[]{"hello", "world"}).withCoder(StringUtf8Coder.of())).apply("Window", Window.into(of)).apply("Lateness", Window.withAllowedLateness(Duration.standardDays(1L))).apply("Trigger", Window.triggering(forever));
    }

    @Test
    public void testMissingLateness() {
        FixedWindows of = FixedWindows.of(Duration.standardMinutes(10L));
        Repeatedly forever = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expectMessage("requires that the allowed lateness");
        TestPipeline.create().apply(Create.of(new String[]{"hello", "world"}).withCoder(StringUtf8Coder.of())).apply("Mode", Window.accumulatingFiredPanes()).apply("Window", Window.into(of)).apply("Trigger", Window.triggering(forever));
    }

    @Test
    @Category({RunnableOnService.class})
    public void testOutputTimeFnDefault() {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.timestamped(new TimestampedValue[]{TimestampedValue.of(KV.of(0, "hello"), new Instant(0L)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10L))})).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L)))).apply(GroupByKey.create()).apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() { // from class: org.apache.beam.sdk.transforms.windowing.WindowTest.1
            public void processElement(DoFn<KV<Integer, Iterable<String>>, Void>.ProcessContext processContext) throws Exception {
                Assert.assertThat(processContext.timestamp(), Matchers.equalTo(new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10L))).maxTimestamp()));
            }
        }));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testOutputTimeFnEndOfWindow() {
        TestPipeline create = TestPipeline.create();
        create.apply(Create.timestamped(new TimestampedValue[]{TimestampedValue.of(KV.of(0, "hello"), new Instant(0L)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10L))})).apply(Window.into(FixedWindows.of(Duration.standardMinutes(10L))).withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())).apply(GroupByKey.create()).apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() { // from class: org.apache.beam.sdk.transforms.windowing.WindowTest.2
            public void processElement(DoFn<KV<Integer, Iterable<String>>, Void>.ProcessContext processContext) throws Exception {
                Assert.assertThat(processContext.timestamp(), Matchers.equalTo(new Instant(599999L)));
            }
        }));
        create.run();
    }

    @Test
    public void testDisplayData() {
        FixedWindows of = FixedWindows.of(Duration.standardHours(5L));
        AfterWatermark.FromEndOfWindow pastEndOfWindow = AfterWatermark.pastEndOfWindow();
        Duration standardMinutes = Duration.standardMinutes(10L);
        Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
        OutputTimeFn outputAtEndOfWindow = OutputTimeFns.outputAtEndOfWindow();
        DisplayData from = DisplayData.from(Window.into(of).triggering(pastEndOfWindow).accumulatingFiredPanes().withAllowedLateness(standardMinutes, closingBehavior).withOutputTimeFn(outputAtEndOfWindow));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("windowFn", of.getClass()));
        Assert.assertThat(from, DisplayDataMatchers.includesDisplayDataFrom(of));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("trigger", pastEndOfWindow.toString()));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("accumulationMode", WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES.toString()));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("allowedLateness", standardMinutes));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("closingBehavior", closingBehavior.toString()));
        Assert.assertThat(from, DisplayDataMatchers.hasDisplayItem("outputTimeFn", outputAtEndOfWindow.getClass()));
    }

    @Test
    public void testDisplayDataExcludesUnspecifiedProperties() {
        Assert.assertThat(DisplayData.from(Window.discardingFiredPanes()), Matchers.not(DisplayDataMatchers.hasDisplayItem(DisplayDataMatchers.hasKey((Matcher<String>) Matchers.isOneOf(new String[]{"windowFn", "trigger", "outputTimeFn", "allowedLateness", "closingBehavior"})))));
        Assert.assertThat(DisplayData.from(Window.into(new GlobalWindows())), Matchers.not(DisplayDataMatchers.hasDisplayItem(DisplayDataMatchers.hasKey("accumulationMode"))));
    }

    @Test
    public void testDisplayDataExcludesDefaults() {
        DisplayData from = DisplayData.from(Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())));
        Assert.assertThat(from, Matchers.not(DisplayDataMatchers.hasDisplayItem("trigger")));
        Assert.assertThat(from, Matchers.not(DisplayDataMatchers.hasDisplayItem("allowedLateness")));
    }
}
