package org.apache.apex.malhar.lib.window;

import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KeyValPair;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import javax.validation.ValidationException;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
import org.apache.apex.malhar.lib.window.ControlTuple;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.apex.malhar.lib.window.impl.FixedDiffEventTimeWatermarkGen;
import org.apache.apex.malhar.lib.window.impl.InMemorySessionWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
import org.apache.apex.malhar.lib.window.impl.SpillableSessionWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
import org.apache.apex.malhar.lib.window.impl.WatermarkImpl;
import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
import org.apache.commons.lang3.mutable.MutableLong;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/apex/malhar/lib/window/WindowedOperatorTest.class */
public class WindowedOperatorTest {
    public static final long BASE = ((System.currentTimeMillis() - 31536000000L) / 1000) * 1000;

    @Parameterized.Parameter
    public Boolean useSpillable;
    private WindowedStorage.WindowedPlainStorage<WindowState> windowStateStorage;
    private WindowedStorage.WindowedPlainStorage<MutableLong> plainDataStorage;
    private WindowedStorage.WindowedPlainStorage<Long> plainRetractionStorage;
    private WindowedStorage.WindowedKeyedStorage<String, MutableLong> keyedDataStorage;
    private WindowedStorage.WindowedKeyedStorage<String, Long> keyedRetractionStorage;
    private SpillableComplexComponentImpl sccImpl;

    @Rule
    public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.apex.malhar.lib.window.WindowedOperatorTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/apex/malhar/lib/window/WindowedOperatorTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode = new int[TriggerOption.AccumulationMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[TriggerOption.AccumulationMode.ACCUMULATING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[TriggerOption.AccumulationMode.DISCARDING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> testParameters() {
        return Arrays.asList(new Object[]{false}, new Object[]{true});
    }

    private void verifyValidationFailure(WindowedOperatorImpl windowedOperatorImpl, String str) {
        try {
            windowedOperatorImpl.validate();
            Assert.fail("Should fail validation because " + str);
        } catch (ValidationException e) {
        }
    }

    private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator() {
        WindowedOperatorImpl<Long, MutableLong, Long> windowedOperatorImpl = new WindowedOperatorImpl<>();
        if (this.useSpillable.booleanValue()) {
            this.sccImpl = new SpillableComplexComponentImpl(this.testMeta.timeStore);
            this.windowStateStorage = new InMemoryWindowedStorage();
            SpillableWindowedPlainStorage spillableWindowedPlainStorage = new SpillableWindowedPlainStorage();
            spillableWindowedPlainStorage.setSpillableComplexComponent(this.sccImpl);
            this.plainDataStorage = spillableWindowedPlainStorage;
            SpillableWindowedPlainStorage spillableWindowedPlainStorage2 = new SpillableWindowedPlainStorage();
            spillableWindowedPlainStorage2.setSpillableComplexComponent(this.sccImpl);
            this.plainRetractionStorage = spillableWindowedPlainStorage2;
            windowedOperatorImpl.addComponent("SpillableComplexComponent", this.sccImpl);
        } else {
            this.windowStateStorage = new InMemoryWindowedStorage();
            this.plainDataStorage = new InMemoryWindowedStorage();
            this.plainRetractionStorage = new InMemoryWindowedStorage();
        }
        windowedOperatorImpl.setDataStorage(this.plainDataStorage);
        windowedOperatorImpl.setRetractionStorage(this.plainRetractionStorage);
        windowedOperatorImpl.setWindowStateStorage(this.windowStateStorage);
        windowedOperatorImpl.setAccumulation(new SumAccumulation());
        return windowedOperatorImpl;
    }

    private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator(boolean z) {
        KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> keyedWindowedOperatorImpl = new KeyedWindowedOperatorImpl<>();
        if (this.useSpillable.booleanValue()) {
            this.sccImpl = new SpillableComplexComponentImpl(this.testMeta.timeStore);
            this.windowStateStorage = new InMemoryWindowedStorage();
            if (z) {
                SpillableSessionWindowedStorage spillableSessionWindowedStorage = new SpillableSessionWindowedStorage();
                spillableSessionWindowedStorage.setSpillableComplexComponent(this.sccImpl);
                this.keyedDataStorage = spillableSessionWindowedStorage;
            } else {
                SpillableWindowedKeyedStorage spillableWindowedKeyedStorage = new SpillableWindowedKeyedStorage();
                spillableWindowedKeyedStorage.setSpillableComplexComponent(this.sccImpl);
                this.keyedDataStorage = spillableWindowedKeyedStorage;
            }
            SpillableWindowedKeyedStorage spillableWindowedKeyedStorage2 = new SpillableWindowedKeyedStorage();
            spillableWindowedKeyedStorage2.setSpillableComplexComponent(this.sccImpl);
            this.keyedRetractionStorage = spillableWindowedKeyedStorage2;
            keyedWindowedOperatorImpl.addComponent("SpillableComplexComponent", this.sccImpl);
        } else {
            this.windowStateStorage = new InMemoryWindowedStorage();
            if (z) {
                this.keyedDataStorage = new InMemorySessionWindowedStorage();
            } else {
                this.keyedDataStorage = new InMemoryWindowedKeyedStorage();
            }
            this.keyedRetractionStorage = new InMemoryWindowedKeyedStorage();
        }
        keyedWindowedOperatorImpl.setDataStorage(this.keyedDataStorage);
        keyedWindowedOperatorImpl.setRetractionStorage(this.keyedRetractionStorage);
        keyedWindowedOperatorImpl.setWindowStateStorage(this.windowStateStorage);
        keyedWindowedOperatorImpl.setAccumulation(new SumAccumulation());
        return keyedWindowedOperatorImpl;
    }

    @Test
    public void testValidation() throws Exception {
        WindowedOperatorImpl windowedOperatorImpl = new WindowedOperatorImpl();
        verifyValidationFailure(windowedOperatorImpl, "nothing is configured");
        windowedOperatorImpl.setWindowStateStorage(new InMemoryWindowedStorage());
        verifyValidationFailure(windowedOperatorImpl, "data storage is not set");
        windowedOperatorImpl.setDataStorage(new InMemoryWindowedStorage());
        verifyValidationFailure(windowedOperatorImpl, "accumulation is not set");
        windowedOperatorImpl.setAccumulation(new SumAccumulation());
        windowedOperatorImpl.validate();
        windowedOperatorImpl.setTriggerOption(new TriggerOption().accumulatingAndRetractingFiredPanes());
        verifyValidationFailure(windowedOperatorImpl, "retracting storage is not set for ACCUMULATING_AND_RETRACTING");
        windowedOperatorImpl.setRetractionStorage(new InMemoryWindowedStorage());
        windowedOperatorImpl.validate();
        windowedOperatorImpl.setTriggerOption(new TriggerOption().discardingFiredPanes().firingOnlyUpdatedPanes());
        verifyValidationFailure(windowedOperatorImpl, "DISCARDING is not valid for option firingOnlyUpdatedPanes");
        windowedOperatorImpl.setTriggerOption(new TriggerOption().accumulatingFiredPanes().firingOnlyUpdatedPanes());
        windowedOperatorImpl.setRetractionStorage((WindowedStorage) null);
        verifyValidationFailure(windowedOperatorImpl, "retracting storage is not set for option firingOnlyUpdatedPanes");
    }

    @Test
    public void testWatermarkAndAllowedLateness() {
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        createDefaultWindowedOperator.controlOutput.setSink(new CollectorTestSink());
        createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        createDefaultWindowedOperator.setAllowedLateness(Duration.millis(1000L));
        createDefaultWindowedOperator.setup(this.testMeta.operatorContext);
        createDefaultWindowedOperator.beginWindow(1L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 100, 2L));
        Assert.assertEquals("There should be exactly one window in the storage", 1L, this.plainDataStorage.size());
        Assert.assertEquals("There should be exactly one window in the storage", 1L, this.windowStateStorage.size());
        Map.Entry entry = (Map.Entry) this.windowStateStorage.entries().iterator().next();
        Window window = (Window) entry.getKey();
        WindowState windowState = (WindowState) entry.getValue();
        Assert.assertEquals(-1L, windowState.watermarkArrivalTime);
        Assert.assertEquals(2L, ((MutableLong) this.plainDataStorage.get(window)).longValue());
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 200, 3L));
        Assert.assertEquals(5L, ((MutableLong) this.plainDataStorage.get(window)).longValue());
        createDefaultWindowedOperator.processWatermark(new WatermarkImpl(BASE + 1200));
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue(windowState.watermarkArrivalTime >= 0);
        Assert.assertEquals("We should get one watermark tuple", 1L, r0.getCount(false));
        createDefaultWindowedOperator.beginWindow(2L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 900, 4L));
        Assert.assertEquals("Late but not too late", 9L, ((MutableLong) this.plainDataStorage.get(window)).longValue());
        createDefaultWindowedOperator.processWatermark(new WatermarkImpl(BASE + 3000));
        createDefaultWindowedOperator.endWindow();
        Assert.assertEquals("We should get two watermark tuples", 2L, r0.getCount(false));
        createDefaultWindowedOperator.beginWindow(3L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 120, 5L));
        Assert.assertEquals("The window should be dropped because it's too late", 0L, this.plainDataStorage.size());
        Assert.assertEquals("The window should be dropped because it's too late", 0L, this.windowStateStorage.size());
        createDefaultWindowedOperator.endWindow();
        createDefaultWindowedOperator.teardown();
    }

    @Test
    public void testImplicitWatermarks() {
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        createDefaultWindowedOperator.controlOutput.setSink(collectorTestSink);
        createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        createDefaultWindowedOperator.setAllowedLateness(Duration.millis(1000L));
        createDefaultWindowedOperator.setImplicitWatermarkGenerator(new FixedDiffEventTimeWatermarkGen(100L));
        createDefaultWindowedOperator.setup(this.testMeta.operatorContext);
        createDefaultWindowedOperator.beginWindow(1L);
        createDefaultWindowedOperator.endWindow();
        Assert.assertEquals("We should get no watermark tuple", 0L, collectorTestSink.getCount(false));
        createDefaultWindowedOperator.beginWindow(2L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 100, 2L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertEquals("We should get one watermark tuple", 1L, collectorTestSink.getCount(false));
        Assert.assertEquals("Check Watermark value", ((ControlTuple.Watermark) collectorTestSink.collectedTuples.get(0)).getTimestamp(), BASE);
        createDefaultWindowedOperator.beginWindow(3L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 900, 4L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertEquals("We should get two watermark tuples", 2L, collectorTestSink.getCount(false));
        Assert.assertEquals("Check Watermark value", ((ControlTuple.Watermark) collectorTestSink.collectedTuples.get(1)).getTimestamp(), BASE + 800);
    }

    private void testTrigger(TriggerOption.AccumulationMode accumulationMode) {
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        TriggerOption withEarlyFiringsAtEvery = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000L));
        switch (AnonymousClass1.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[accumulationMode.ordinal()]) {
            case 1:
                withEarlyFiringsAtEvery.accumulatingFiredPanes();
                break;
            case 2:
                withEarlyFiringsAtEvery.accumulatingAndRetractingFiredPanes();
                break;
            case 3:
                withEarlyFiringsAtEvery.discardingFiredPanes();
                break;
            default:
                throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
        }
        createDefaultWindowedOperator.setTriggerOption(withEarlyFiringsAtEvery);
        createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        createDefaultWindowedOperator.output.setSink(collectorTestSink);
        createDefaultWindowedOperator.setup(this.testMeta.operatorContext);
        createDefaultWindowedOperator.beginWindow(1L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 100, 2L));
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 200, 3L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(2L);
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(3L);
        createDefaultWindowedOperator.endWindow();
        Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals(5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
        collectorTestSink.collectedTuples.clear();
        createDefaultWindowedOperator.beginWindow(4L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 400, 4L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(5L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 300, 5L));
        createDefaultWindowedOperator.endWindow();
        switch (AnonymousClass1.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[accumulationMode.ordinal()]) {
            case 1:
                Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals(14L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
                break;
            case 2:
                Assert.assertEquals("There should be exactly two tuples for the time trigger", 2L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals(-5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
                Assert.assertEquals(14L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(1)).getValue()).longValue());
                break;
            case 3:
                Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals(9L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
                break;
            default:
                throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
        }
        createDefaultWindowedOperator.teardown();
    }

    @Test
    public void testTriggerWithDiscardingMode() {
        testTrigger(TriggerOption.AccumulationMode.DISCARDING);
    }

    @Test
    public void testTriggerWithAccumulatingMode() {
        testTrigger(TriggerOption.AccumulationMode.ACCUMULATING);
    }

    @Test
    public void testTriggerWithAccumulatingAndRetractingMode() {
        testTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING);
    }

    @Test
    public void testTriggerWithAccumulatingModeFiringAllPanes() {
        testTrigger2(false, false);
    }

    @Test
    public void testTriggerWithAccumulatingAndRetractingModeFiringAllPanes() {
        testTrigger2(false, true);
    }

    @Test
    public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes() {
        testTrigger2(true, false);
    }

    @Test
    public void testTriggerWithAccumulatingAndRetractingModeFiringOnlyUpdatedPanes() {
        testTrigger2(true, true);
    }

    private void testTrigger2(boolean z, boolean z2) {
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        TriggerOption withEarlyFiringsAtEvery = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000L));
        if (z2) {
            withEarlyFiringsAtEvery.accumulatingAndRetractingFiredPanes();
        } else {
            withEarlyFiringsAtEvery.accumulatingFiredPanes();
        }
        if (z) {
            withEarlyFiringsAtEvery.firingOnlyUpdatedPanes();
        }
        createDefaultWindowedOperator.setTriggerOption(withEarlyFiringsAtEvery);
        createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        createDefaultWindowedOperator.output.setSink(collectorTestSink);
        createDefaultWindowedOperator.setup(this.testMeta.operatorContext);
        createDefaultWindowedOperator.beginWindow(1L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 100, 2L));
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 200, 3L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(2L);
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(3L);
        createDefaultWindowedOperator.endWindow();
        Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals(5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
        collectorTestSink.collectedTuples.clear();
        createDefaultWindowedOperator.beginWindow(4L);
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(5L);
        createDefaultWindowedOperator.endWindow();
        if (z) {
            Assert.assertTrue("There should not be any trigger since no panes have been updated", collectorTestSink.collectedTuples.isEmpty());
        } else if (z2) {
            Assert.assertEquals("There should be exactly two tuples for the time trigger", 2L, collectorTestSink.collectedTuples.size());
            Assert.assertEquals(-5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
            Assert.assertEquals(5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(1)).getValue()).longValue());
        } else {
            Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
            Assert.assertEquals(5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
        }
        createDefaultWindowedOperator.teardown();
    }

    @Test
    public void testGlobalWindowAssignment() {
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        createDefaultWindowedOperator.setWindowOption(new WindowOption.GlobalWindow());
        createDefaultWindowedOperator.setup(this.testMeta.operatorContext);
        Collection windows = createDefaultWindowedOperator.getWindowedValue(new Tuple.TimestampedTuple(BASE + 1100, 2L)).getWindows();
        Assert.assertEquals(1L, windows.size());
        Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next());
        createDefaultWindowedOperator.teardown();
    }

    @Test
    public void testTimeWindowAssignment() {
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        createDefaultWindowedOperator.setup(this.testMeta.operatorContext);
        Collection windows = createDefaultWindowedOperator.getWindowedValue(new Tuple.TimestampedTuple(BASE + 1100, 2L)).getWindows();
        Assert.assertEquals(1L, windows.size());
        Window window = (Window) windows.iterator().next();
        Assert.assertEquals(BASE + 1000, window.getBeginTimestamp());
        Assert.assertEquals(1000L, window.getDurationMillis());
    }

    @Test
    public void testSlidingWindowAssignment() {
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        createDefaultWindowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000L), Duration.millis(200L)));
        createDefaultWindowedOperator.setup(this.testMeta.operatorContext);
        Window[] windowArr = (Window[]) createDefaultWindowedOperator.getWindowedValue(new Tuple.TimestampedTuple(BASE + 1600, 2L)).getWindows().toArray(new Window[0]);
        Assert.assertEquals(5L, windowArr.length);
        Assert.assertEquals(BASE + 800, windowArr[0].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[0].getDurationMillis());
        Assert.assertEquals(BASE + 1000, windowArr[1].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[1].getDurationMillis());
        Assert.assertEquals(BASE + 1200, windowArr[2].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[2].getDurationMillis());
        Assert.assertEquals(BASE + 1400, windowArr[3].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[3].getDurationMillis());
        Assert.assertEquals(BASE + 1600, windowArr[4].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[4].getDurationMillis());
        createDefaultWindowedOperator.teardown();
    }

    @Test
    public void testSessionWindows() {
        KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator = createDefaultKeyedWindowedOperator(true);
        createDefaultKeyedWindowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000L)));
        createDefaultKeyedWindowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1L).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes());
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        createDefaultKeyedWindowedOperator.output.setSink(collectorTestSink);
        createDefaultKeyedWindowedOperator.setup(this.testMeta.operatorContext);
        createDefaultKeyedWindowedOperator.beginWindow(1L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 1100, new KeyValPair("a", 2L)));
        Assert.assertEquals(1L, collectorTestSink.getCount(false));
        Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(0);
        Assert.assertEquals(1L, windowedTuple.getWindows().size());
        Window.SessionWindow sessionWindow = (Window.SessionWindow) windowedTuple.getWindows().iterator().next();
        Assert.assertEquals(BASE + 1100, sessionWindow.getBeginTimestamp());
        Assert.assertEquals(2000L, sessionWindow.getDurationMillis());
        Assert.assertEquals("a", sessionWindow.getKey());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple.getValue()).getKey());
        Assert.assertEquals(2L, ((Long) ((KeyValPair) windowedTuple.getValue()).getValue()).longValue());
        collectorTestSink.clear();
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 2000, new KeyValPair("a", 3L)));
        Assert.assertEquals(2L, collectorTestSink.getCount(false));
        Tuple.WindowedTuple windowedTuple2 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(0);
        Assert.assertEquals(1L, windowedTuple2.getWindows().size());
        Assert.assertEquals(sessionWindow, windowedTuple2.getWindows().iterator().next());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple2.getValue()).getKey());
        Assert.assertEquals(-2L, ((Long) ((KeyValPair) windowedTuple2.getValue()).getValue()).longValue());
        Tuple.WindowedTuple windowedTuple3 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(1);
        Window.SessionWindow sessionWindow2 = (Window.SessionWindow) windowedTuple3.getWindows().iterator().next();
        Assert.assertEquals(BASE + 1100, sessionWindow2.getBeginTimestamp());
        Assert.assertEquals(2900L, sessionWindow2.getDurationMillis());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple3.getValue()).getKey());
        Assert.assertEquals(5L, ((Long) ((KeyValPair) windowedTuple3.getValue()).getValue()).longValue());
        collectorTestSink.clear();
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 5000, new KeyValPair("a", 4L)));
        Assert.assertEquals(1L, collectorTestSink.getCount(false));
        Tuple.WindowedTuple windowedTuple4 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(0);
        Assert.assertEquals(1L, windowedTuple4.getWindows().size());
        Window.SessionWindow sessionWindow3 = (Window.SessionWindow) windowedTuple4.getWindows().iterator().next();
        Assert.assertEquals(BASE + 5000, sessionWindow3.getBeginTimestamp());
        Assert.assertEquals(2000L, sessionWindow3.getDurationMillis());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple4.getValue()).getKey());
        Assert.assertEquals(4L, ((Long) ((KeyValPair) windowedTuple4.getValue()).getValue()).longValue());
        collectorTestSink.clear();
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 3500, new KeyValPair("a", 3L)));
        Assert.assertEquals(3L, collectorTestSink.getCount(false));
        TreeMap treeMap = new TreeMap();
        Tuple.WindowedTuple windowedTuple5 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(0);
        Assert.assertEquals(1L, windowedTuple5.getWindows().size());
        treeMap.put(windowedTuple5.getWindows().iterator().next(), windowedTuple5.getValue());
        Tuple.WindowedTuple windowedTuple6 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(1);
        Assert.assertEquals(1L, windowedTuple6.getWindows().size());
        treeMap.put(windowedTuple6.getWindows().iterator().next(), windowedTuple6.getValue());
        Iterator it = treeMap.entrySet().iterator();
        Map.Entry entry = (Map.Entry) it.next();
        Assert.assertEquals(sessionWindow2, entry.getKey());
        Assert.assertEquals("a", ((KeyValPair) entry.getValue()).getKey());
        Assert.assertEquals(-5L, ((Long) ((KeyValPair) entry.getValue()).getValue()).longValue());
        Map.Entry entry2 = (Map.Entry) it.next();
        Assert.assertEquals(sessionWindow3, entry2.getKey());
        Assert.assertEquals("a", ((KeyValPair) entry2.getValue()).getKey());
        Assert.assertEquals(-4L, ((Long) ((KeyValPair) entry2.getValue()).getValue()).longValue());
        Tuple.WindowedTuple windowedTuple7 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(2);
        Assert.assertEquals(1L, windowedTuple7.getWindows().size());
        Window.SessionWindow sessionWindow4 = (Window.SessionWindow) windowedTuple7.getWindows().iterator().next();
        Assert.assertEquals(BASE + 1100, sessionWindow4.getBeginTimestamp());
        Assert.assertEquals(5900L, sessionWindow4.getDurationMillis());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple7.getValue()).getKey());
        Assert.assertEquals(12L, ((Long) ((KeyValPair) windowedTuple7.getValue()).getValue()).longValue());
        createDefaultKeyedWindowedOperator.endWindow();
        createDefaultKeyedWindowedOperator.teardown();
    }

    @Test
    public void testKeyedAccumulation() {
        KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator = createDefaultKeyedWindowedOperator(false);
        createDefaultKeyedWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        createDefaultKeyedWindowedOperator.setup(this.testMeta.operatorContext);
        createDefaultKeyedWindowedOperator.beginWindow(1L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 100, new KeyValPair("a", 2L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 200, new KeyValPair("a", 3L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 300, new KeyValPair("b", 4L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 150, new KeyValPair("b", 5L)));
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertEquals(1L, this.keyedDataStorage.size());
        Assert.assertEquals(5L, ((MutableLong) this.keyedDataStorage.get(new Window.TimeWindow(BASE, 1000L), "a")).longValue());
        Assert.assertEquals(9L, ((MutableLong) this.keyedDataStorage.get(new Window.TimeWindow(BASE, 1000L), "b")).longValue());
        createDefaultKeyedWindowedOperator.teardown();
    }

    private void testKeyedTrigger(TriggerOption.AccumulationMode accumulationMode) {
        KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator = createDefaultKeyedWindowedOperator(false);
        TriggerOption withEarlyFiringsAtEvery = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000L));
        switch (AnonymousClass1.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[accumulationMode.ordinal()]) {
            case 1:
                withEarlyFiringsAtEvery.accumulatingFiredPanes();
                break;
            case 2:
                withEarlyFiringsAtEvery.accumulatingAndRetractingFiredPanes();
                break;
            case 3:
                withEarlyFiringsAtEvery.discardingFiredPanes();
                break;
            default:
                throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
        }
        createDefaultKeyedWindowedOperator.setTriggerOption(withEarlyFiringsAtEvery);
        createDefaultKeyedWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        createDefaultKeyedWindowedOperator.output.setSink(collectorTestSink);
        createDefaultKeyedWindowedOperator.setup(this.testMeta.operatorContext);
        createDefaultKeyedWindowedOperator.beginWindow(1L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 100, new KeyValPair("a", 2L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 200, new KeyValPair("b", 3L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 400, new KeyValPair("b", 5L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 300, new KeyValPair("a", 4L)));
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultKeyedWindowedOperator.beginWindow(2L);
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultKeyedWindowedOperator.beginWindow(3L);
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertEquals("There should be exactly two tuple for the time trigger", 2L, collectorTestSink.collectedTuples.size());
        HashMap hashMap = new HashMap();
        for (Tuple tuple : collectorTestSink.collectedTuples) {
            hashMap.put(((KeyValPair) tuple.getValue()).getKey(), ((KeyValPair) tuple.getValue()).getValue());
        }
        Assert.assertEquals(6L, ((Long) hashMap.get("a")).longValue());
        Assert.assertEquals(8L, ((Long) hashMap.get("b")).longValue());
        collectorTestSink.collectedTuples.clear();
        createDefaultKeyedWindowedOperator.beginWindow(4L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 400, new KeyValPair("a", 8L)));
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultKeyedWindowedOperator.beginWindow(5L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(BASE + 300, new KeyValPair("b", 9L)));
        createDefaultKeyedWindowedOperator.endWindow();
        HashMap hashMap2 = new HashMap();
        switch (AnonymousClass1.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[accumulationMode.ordinal()]) {
            case 1:
                Assert.assertEquals("There should be exactly two tuples for the time trigger", 2L, collectorTestSink.collectedTuples.size());
                for (Tuple tuple2 : collectorTestSink.collectedTuples) {
                    hashMap2.put(((KeyValPair) tuple2.getValue()).getKey(), ((KeyValPair) tuple2.getValue()).getValue());
                }
                Assert.assertEquals(14L, ((Long) hashMap2.get("a")).longValue());
                Assert.assertEquals(17L, ((Long) hashMap2.get("b")).longValue());
                break;
            case 2:
                Assert.assertEquals("There should be exactly four tuples for the time trigger", 4L, collectorTestSink.collectedTuples.size());
                for (Tuple tuple3 : collectorTestSink.collectedTuples) {
                    String str = (String) ((KeyValPair) tuple3.getValue()).getKey();
                    long longValue = ((Long) ((KeyValPair) tuple3.getValue()).getValue()).longValue();
                    hashMap2.put(longValue < 0 ? "R" + str : str, Long.valueOf(longValue));
                }
                Assert.assertEquals(-6L, ((Long) hashMap2.get("Ra")).longValue());
                Assert.assertEquals(-8L, ((Long) hashMap2.get("Rb")).longValue());
                Assert.assertEquals(14L, ((Long) hashMap2.get("a")).longValue());
                Assert.assertEquals(17L, ((Long) hashMap2.get("b")).longValue());
                break;
            case 3:
                Assert.assertEquals("There should be exactly two tuples for the time trigger", 2L, collectorTestSink.collectedTuples.size());
                for (Tuple tuple4 : collectorTestSink.collectedTuples) {
                    hashMap2.put(((KeyValPair) tuple4.getValue()).getKey(), ((KeyValPair) tuple4.getValue()).getValue());
                }
                Assert.assertEquals(8L, ((Long) hashMap2.get("a")).longValue());
                Assert.assertEquals(9L, ((Long) hashMap2.get("b")).longValue());
                break;
            default:
                throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
        }
        createDefaultKeyedWindowedOperator.teardown();
    }

    @Test
    public void testKeyedTriggerWithDiscardingMode() {
        testKeyedTrigger(TriggerOption.AccumulationMode.DISCARDING);
    }

    @Test
    public void testKeyedTriggerWithAccumulatingMode() {
        testKeyedTrigger(TriggerOption.AccumulationMode.ACCUMULATING);
    }

    @Test
    public void testKeyedTriggerWithAccumulatingAndRetractingMode() {
        testKeyedTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING);
    }
}
