package org.apache.apex.malhar.lib.window;

import com.datatorrent.api.Attribute;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KeyValPair;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.validation.ValidationException;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
import org.apache.apex.malhar.lib.window.impl.WatermarkImpl;
import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
import org.apache.commons.lang3.mutable.MutableLong;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/lib/window/WindowedOperatorTest.class */
public class WindowedOperatorTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.apex.malhar.lib.window.WindowedOperatorTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/apex/malhar/lib/window/WindowedOperatorTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode = new int[TriggerOption.AccumulationMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[TriggerOption.AccumulationMode.ACCUMULATING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[TriggerOption.AccumulationMode.DISCARDING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    private void verifyValidationFailure(WindowedOperatorImpl windowedOperatorImpl, String str) {
        try {
            windowedOperatorImpl.validate();
            Assert.fail("Should fail validation because " + str);
        } catch (ValidationException e) {
        }
    }

    private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator() {
        WindowedOperatorImpl<Long, MutableLong, Long> windowedOperatorImpl = new WindowedOperatorImpl<>();
        windowedOperatorImpl.setDataStorage(new InMemoryWindowedStorage());
        windowedOperatorImpl.setRetractionStorage(new InMemoryWindowedStorage());
        windowedOperatorImpl.setWindowStateStorage(new InMemoryWindowedStorage());
        windowedOperatorImpl.setAccumulation(new SumAccumulation());
        return windowedOperatorImpl;
    }

    private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator() {
        KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> keyedWindowedOperatorImpl = new KeyedWindowedOperatorImpl<>();
        keyedWindowedOperatorImpl.setDataStorage(new InMemoryWindowedKeyedStorage());
        keyedWindowedOperatorImpl.setRetractionStorage(new InMemoryWindowedKeyedStorage());
        keyedWindowedOperatorImpl.setWindowStateStorage(new InMemoryWindowedStorage());
        keyedWindowedOperatorImpl.setAccumulation(new SumAccumulation());
        return keyedWindowedOperatorImpl;
    }

    @Test
    public void testValidation() throws Exception {
        WindowedOperatorImpl windowedOperatorImpl = new WindowedOperatorImpl();
        verifyValidationFailure(windowedOperatorImpl, "nothing is configured");
        windowedOperatorImpl.setWindowStateStorage(new InMemoryWindowedStorage());
        verifyValidationFailure(windowedOperatorImpl, "data storage is not set");
        windowedOperatorImpl.setDataStorage(new InMemoryWindowedStorage());
        verifyValidationFailure(windowedOperatorImpl, "accumulation is not set");
        windowedOperatorImpl.setAccumulation(new SumAccumulation());
        windowedOperatorImpl.validate();
        windowedOperatorImpl.setTriggerOption(new TriggerOption().accumulatingAndRetractingFiredPanes());
        verifyValidationFailure(windowedOperatorImpl, "retracting storage is not set for ACCUMULATING_AND_RETRACTING");
        windowedOperatorImpl.setRetractionStorage(new InMemoryWindowedStorage());
        windowedOperatorImpl.validate();
        windowedOperatorImpl.setTriggerOption(new TriggerOption().discardingFiredPanes().firingOnlyUpdatedPanes());
        verifyValidationFailure(windowedOperatorImpl, "DISCARDING is not valid for option firingOnlyUpdatedPanes");
        windowedOperatorImpl.setTriggerOption(new TriggerOption().accumulatingFiredPanes().firingOnlyUpdatedPanes());
        windowedOperatorImpl.setRetractionStorage((WindowedStorage) null);
        verifyValidationFailure(windowedOperatorImpl, "retracting storage is not set for option firingOnlyUpdatedPanes");
    }

    @Test
    public void testWatermarkAndAllowedLateness() {
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap());
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        createDefaultWindowedOperator.controlOutput.setSink(new CollectorTestSink());
        createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        createDefaultWindowedOperator.setAllowedLateness(Duration.millis(1000L));
        InMemoryWindowedStorage inMemoryWindowedStorage = new InMemoryWindowedStorage();
        InMemoryWindowedStorage inMemoryWindowedStorage2 = new InMemoryWindowedStorage();
        createDefaultWindowedOperator.setDataStorage(inMemoryWindowedStorage);
        createDefaultWindowedOperator.setWindowStateStorage(inMemoryWindowedStorage2);
        createDefaultWindowedOperator.setup(testIdOperatorContext);
        createDefaultWindowedOperator.beginWindow(1L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(100L, 2L));
        Assert.assertEquals("There should be exactly one window in the storage", 1L, inMemoryWindowedStorage.size());
        Assert.assertEquals("There should be exactly one window in the storage", 1L, inMemoryWindowedStorage2.size());
        Map.Entry entry = (Map.Entry) inMemoryWindowedStorage2.entrySet().iterator().next();
        Window window = (Window) entry.getKey();
        WindowState windowState = (WindowState) entry.getValue();
        Assert.assertEquals(-1L, windowState.watermarkArrivalTime);
        Assert.assertEquals(2L, ((MutableLong) inMemoryWindowedStorage.get(window)).longValue());
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(200L, 3L));
        Assert.assertEquals(5L, ((MutableLong) inMemoryWindowedStorage.get(window)).longValue());
        createDefaultWindowedOperator.processWatermark(new WatermarkImpl(1200L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue(windowState.watermarkArrivalTime > 0);
        Assert.assertEquals("We should get one watermark tuple", 1L, r0.getCount(false));
        createDefaultWindowedOperator.beginWindow(2L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(900L, 4L));
        Assert.assertEquals("Late but not too late", 9L, ((MutableLong) inMemoryWindowedStorage.get(window)).longValue());
        createDefaultWindowedOperator.processWatermark(new WatermarkImpl(3000L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertEquals("We should get two watermark tuples", 2L, r0.getCount(false));
        createDefaultWindowedOperator.beginWindow(3L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(120L, 5L));
        Assert.assertEquals("The window should be dropped because it's too late", 0L, inMemoryWindowedStorage.size());
        Assert.assertEquals("The window should be dropped because it's too late", 0L, inMemoryWindowedStorage2.size());
        createDefaultWindowedOperator.endWindow();
    }

    private void testTrigger(TriggerOption.AccumulationMode accumulationMode) {
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        TriggerOption withEarlyFiringsAtEvery = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000L));
        switch (AnonymousClass1.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[accumulationMode.ordinal()]) {
            case 1:
                withEarlyFiringsAtEvery.accumulatingFiredPanes();
                break;
            case 2:
                withEarlyFiringsAtEvery.accumulatingAndRetractingFiredPanes();
                break;
            case 3:
                withEarlyFiringsAtEvery.discardingFiredPanes();
                break;
            default:
                throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
        }
        createDefaultWindowedOperator.setTriggerOption(withEarlyFiringsAtEvery);
        createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        createDefaultWindowedOperator.output.setSink(collectorTestSink);
        createDefaultWindowedOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap()));
        createDefaultWindowedOperator.beginWindow(1L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(100L, 2L));
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(200L, 3L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(2L);
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(3L);
        createDefaultWindowedOperator.endWindow();
        Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals(5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
        collectorTestSink.collectedTuples.clear();
        createDefaultWindowedOperator.beginWindow(4L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(400L, 4L));
        createDefaultWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultWindowedOperator.beginWindow(5L);
        createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(300L, 5L));
        createDefaultWindowedOperator.endWindow();
        switch (AnonymousClass1.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[accumulationMode.ordinal()]) {
            case 1:
                Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals(14L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
                return;
            case 2:
                Assert.assertEquals("There should be exactly two tuples for the time trigger", 2L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals(-5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
                Assert.assertEquals(14L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(1)).getValue()).longValue());
                return;
            case 3:
                Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals(9L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
                return;
            default:
                throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
        }
    }

    @Test
    public void testTriggerWithDiscardingMode() {
        testTrigger(TriggerOption.AccumulationMode.DISCARDING);
    }

    @Test
    public void testTriggerWithAccumulatingMode() {
        testTrigger(TriggerOption.AccumulationMode.ACCUMULATING);
    }

    @Test
    public void testTriggerWithAccumulatingAndRetractingMode() {
        testTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING);
    }

    @Test
    public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes() {
        for (boolean z : new boolean[]{true, false}) {
            WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
            TriggerOption accumulatingFiredPanes = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000L)).accumulatingFiredPanes();
            if (z) {
                accumulatingFiredPanes.firingOnlyUpdatedPanes();
            }
            createDefaultWindowedOperator.setTriggerOption(accumulatingFiredPanes);
            createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
            CollectorTestSink collectorTestSink = new CollectorTestSink();
            createDefaultWindowedOperator.output.setSink(collectorTestSink);
            createDefaultWindowedOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap()));
            createDefaultWindowedOperator.beginWindow(1L);
            createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(100L, 2L));
            createDefaultWindowedOperator.processTuple(new Tuple.TimestampedTuple(200L, 3L));
            createDefaultWindowedOperator.endWindow();
            Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
            createDefaultWindowedOperator.beginWindow(2L);
            createDefaultWindowedOperator.endWindow();
            Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
            createDefaultWindowedOperator.beginWindow(3L);
            createDefaultWindowedOperator.endWindow();
            Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
            Assert.assertEquals(5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
            collectorTestSink.collectedTuples.clear();
            createDefaultWindowedOperator.beginWindow(4L);
            createDefaultWindowedOperator.endWindow();
            Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
            createDefaultWindowedOperator.beginWindow(5L);
            createDefaultWindowedOperator.endWindow();
            if (z) {
                Assert.assertTrue("There should not be any trigger since no panes have been updated", collectorTestSink.collectedTuples.isEmpty());
            } else {
                Assert.assertEquals("There should be exactly one tuple for the time trigger", 1L, collectorTestSink.collectedTuples.size());
                Assert.assertEquals(5L, ((Long) ((Tuple) collectorTestSink.collectedTuples.get(0)).getValue()).longValue());
            }
        }
    }

    @Test
    public void testGlobalWindowAssignment() {
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap());
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        createDefaultWindowedOperator.setWindowOption(new WindowOption.GlobalWindow());
        createDefaultWindowedOperator.setup(testIdOperatorContext);
        List windows = createDefaultWindowedOperator.getWindowedValue(new Tuple.TimestampedTuple(1100L, 2L)).getWindows();
        Assert.assertEquals(1L, windows.size());
        Assert.assertEquals(Window.GLOBAL_WINDOW, windows.get(0));
    }

    @Test
    public void testTimeWindowAssignment() {
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap());
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        createDefaultWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        createDefaultWindowedOperator.setup(testIdOperatorContext);
        List windows = createDefaultWindowedOperator.getWindowedValue(new Tuple.TimestampedTuple(1100L, 2L)).getWindows();
        Assert.assertEquals(1L, windows.size());
        Assert.assertEquals(1000L, ((Window) windows.get(0)).getBeginTimestamp());
        Assert.assertEquals(1000L, ((Window) windows.get(0)).getDurationMillis());
    }

    @Test
    public void testSlidingWindowAssignment() {
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap());
        WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator = createDefaultWindowedOperator();
        createDefaultWindowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000L), Duration.millis(200L)));
        createDefaultWindowedOperator.setup(testIdOperatorContext);
        Window[] windowArr = (Window[]) createDefaultWindowedOperator.getWindowedValue(new Tuple.TimestampedTuple(1600L, 2L)).getWindows().toArray(new Window[0]);
        Arrays.sort(windowArr, Window.DEFAULT_COMPARATOR);
        Assert.assertEquals(5L, windowArr.length);
        Assert.assertEquals(800L, windowArr[0].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[0].getDurationMillis());
        Assert.assertEquals(1000L, windowArr[1].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[1].getDurationMillis());
        Assert.assertEquals(1200L, windowArr[2].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[2].getDurationMillis());
        Assert.assertEquals(1400L, windowArr[3].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[3].getDurationMillis());
        Assert.assertEquals(1600L, windowArr[4].getBeginTimestamp());
        Assert.assertEquals(1000L, windowArr[4].getDurationMillis());
    }

    @Test
    public void testSessionWindows() {
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap());
        KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator = createDefaultKeyedWindowedOperator();
        createDefaultKeyedWindowedOperator.setWindowOption(new WindowOption.SessionWindows(Duration.millis(2000L)));
        createDefaultKeyedWindowedOperator.setTriggerOption(new TriggerOption().withEarlyFiringsAtEvery(1L).accumulatingAndRetractingFiredPanes().firingOnlyUpdatedPanes());
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        createDefaultKeyedWindowedOperator.output.setSink(collectorTestSink);
        createDefaultKeyedWindowedOperator.setup(testIdOperatorContext);
        createDefaultKeyedWindowedOperator.beginWindow(1L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(1100L, new KeyValPair("a", 2L)));
        Assert.assertEquals(1L, collectorTestSink.getCount(false));
        Tuple.WindowedTuple windowedTuple = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(0);
        Assert.assertEquals(1L, windowedTuple.getWindows().size());
        Window.SessionWindow sessionWindow = (Window.SessionWindow) windowedTuple.getWindows().get(0);
        Assert.assertEquals(1100L, sessionWindow.getBeginTimestamp());
        Assert.assertEquals(1L, sessionWindow.getDurationMillis());
        Assert.assertEquals("a", sessionWindow.getKey());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple.getValue()).getKey());
        Assert.assertEquals(2L, ((Long) ((KeyValPair) windowedTuple.getValue()).getValue()).longValue());
        collectorTestSink.clear();
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(2000L, new KeyValPair("a", 3L)));
        Assert.assertEquals(2L, collectorTestSink.getCount(false));
        Tuple.WindowedTuple windowedTuple2 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(0);
        Assert.assertEquals(1L, windowedTuple2.getWindows().size());
        Assert.assertEquals(sessionWindow, windowedTuple2.getWindows().get(0));
        Assert.assertEquals("a", ((KeyValPair) windowedTuple2.getValue()).getKey());
        Assert.assertEquals(-2L, ((Long) ((KeyValPair) windowedTuple2.getValue()).getValue()).longValue());
        Tuple.WindowedTuple windowedTuple3 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(1);
        Window.SessionWindow sessionWindow2 = (Window.SessionWindow) windowedTuple3.getWindows().get(0);
        Assert.assertEquals(1100L, sessionWindow2.getBeginTimestamp());
        Assert.assertEquals(901L, sessionWindow2.getDurationMillis());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple3.getValue()).getKey());
        Assert.assertEquals(5L, ((Long) ((KeyValPair) windowedTuple3.getValue()).getValue()).longValue());
        collectorTestSink.clear();
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(5000L, new KeyValPair("a", 4L)));
        Assert.assertEquals(1L, collectorTestSink.getCount(false));
        Tuple.WindowedTuple windowedTuple4 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(0);
        Assert.assertEquals(1L, windowedTuple4.getWindows().size());
        Window.SessionWindow sessionWindow3 = (Window.SessionWindow) windowedTuple4.getWindows().get(0);
        Assert.assertEquals(5000L, sessionWindow3.getBeginTimestamp());
        Assert.assertEquals(1L, sessionWindow3.getDurationMillis());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple4.getValue()).getKey());
        Assert.assertEquals(4L, ((Long) ((KeyValPair) windowedTuple4.getValue()).getValue()).longValue());
        collectorTestSink.clear();
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(3500L, new KeyValPair("a", 3L)));
        Assert.assertEquals(3L, collectorTestSink.getCount(false));
        Tuple.WindowedTuple windowedTuple5 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(0);
        Assert.assertEquals(1L, windowedTuple5.getWindows().size());
        Assert.assertEquals(sessionWindow2, windowedTuple5.getWindows().get(0));
        Assert.assertEquals("a", ((KeyValPair) windowedTuple5.getValue()).getKey());
        Assert.assertEquals(-5L, ((Long) ((KeyValPair) windowedTuple5.getValue()).getValue()).longValue());
        Tuple.WindowedTuple windowedTuple6 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(1);
        Assert.assertEquals(1L, windowedTuple6.getWindows().size());
        Assert.assertEquals(sessionWindow3, windowedTuple6.getWindows().get(0));
        Assert.assertEquals("a", ((KeyValPair) windowedTuple6.getValue()).getKey());
        Assert.assertEquals(-4L, ((Long) ((KeyValPair) windowedTuple6.getValue()).getValue()).longValue());
        Tuple.WindowedTuple windowedTuple7 = (Tuple.WindowedTuple) collectorTestSink.collectedTuples.get(2);
        Assert.assertEquals(1L, windowedTuple7.getWindows().size());
        Window.SessionWindow sessionWindow4 = (Window.SessionWindow) windowedTuple7.getWindows().get(0);
        Assert.assertEquals(1100L, sessionWindow4.getBeginTimestamp());
        Assert.assertEquals(3901L, sessionWindow4.getDurationMillis());
        Assert.assertEquals("a", ((KeyValPair) windowedTuple7.getValue()).getKey());
        Assert.assertEquals(12L, ((Long) ((KeyValPair) windowedTuple7.getValue()).getValue()).longValue());
        createDefaultKeyedWindowedOperator.endWindow();
    }

    @Test
    public void testKeyedAccumulation() {
        OperatorContextTestHelper.TestIdOperatorContext testIdOperatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap());
        KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator = createDefaultKeyedWindowedOperator();
        createDefaultKeyedWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        InMemoryWindowedKeyedStorage inMemoryWindowedKeyedStorage = new InMemoryWindowedKeyedStorage();
        createDefaultKeyedWindowedOperator.setDataStorage(inMemoryWindowedKeyedStorage);
        createDefaultKeyedWindowedOperator.setup(testIdOperatorContext);
        createDefaultKeyedWindowedOperator.beginWindow(1L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(100L, new KeyValPair("a", 2L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(200L, new KeyValPair("a", 3L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(300L, new KeyValPair("b", 4L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(150L, new KeyValPair("b", 5L)));
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertEquals(1L, inMemoryWindowedKeyedStorage.size());
        Assert.assertEquals(5L, ((MutableLong) inMemoryWindowedKeyedStorage.get(new Window.TimeWindow(0L, 1000L), "a")).longValue());
        Assert.assertEquals(9L, ((MutableLong) inMemoryWindowedKeyedStorage.get(new Window.TimeWindow(0L, 1000L), "b")).longValue());
    }

    private void testKeyedTrigger(TriggerOption.AccumulationMode accumulationMode) {
        KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator = createDefaultKeyedWindowedOperator();
        TriggerOption withEarlyFiringsAtEvery = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000L));
        switch (AnonymousClass1.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[accumulationMode.ordinal()]) {
            case 1:
                withEarlyFiringsAtEvery.accumulatingFiredPanes();
                break;
            case 2:
                withEarlyFiringsAtEvery.accumulatingAndRetractingFiredPanes();
                break;
            case 3:
                withEarlyFiringsAtEvery.discardingFiredPanes();
                break;
            default:
                throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
        }
        createDefaultKeyedWindowedOperator.setTriggerOption(withEarlyFiringsAtEvery);
        createDefaultKeyedWindowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000L)));
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        createDefaultKeyedWindowedOperator.output.setSink(collectorTestSink);
        createDefaultKeyedWindowedOperator.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap()));
        createDefaultKeyedWindowedOperator.beginWindow(1L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(100L, new KeyValPair("a", 2L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(200L, new KeyValPair("b", 3L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(400L, new KeyValPair("b", 5L)));
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(300L, new KeyValPair("a", 4L)));
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultKeyedWindowedOperator.beginWindow(2L);
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultKeyedWindowedOperator.beginWindow(3L);
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertEquals("There should be exactly two tuple for the time trigger", 2L, collectorTestSink.collectedTuples.size());
        HashMap hashMap = new HashMap();
        for (Tuple tuple : collectorTestSink.collectedTuples) {
            hashMap.put(((KeyValPair) tuple.getValue()).getKey(), ((KeyValPair) tuple.getValue()).getValue());
        }
        Assert.assertEquals(6L, ((Long) hashMap.get("a")).longValue());
        Assert.assertEquals(8L, ((Long) hashMap.get("b")).longValue());
        collectorTestSink.collectedTuples.clear();
        createDefaultKeyedWindowedOperator.beginWindow(4L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(400L, new KeyValPair("a", 8L)));
        createDefaultKeyedWindowedOperator.endWindow();
        Assert.assertTrue("No trigger should be fired yet", collectorTestSink.collectedTuples.isEmpty());
        createDefaultKeyedWindowedOperator.beginWindow(5L);
        createDefaultKeyedWindowedOperator.processTuple(new Tuple.TimestampedTuple(300L, new KeyValPair("b", 9L)));
        createDefaultKeyedWindowedOperator.endWindow();
        HashMap hashMap2 = new HashMap();
        switch (AnonymousClass1.$SwitchMap$org$apache$apex$malhar$lib$window$TriggerOption$AccumulationMode[accumulationMode.ordinal()]) {
            case 1:
                Assert.assertEquals("There should be exactly two tuples for the time trigger", 2L, collectorTestSink.collectedTuples.size());
                for (Tuple tuple2 : collectorTestSink.collectedTuples) {
                    hashMap2.put(((KeyValPair) tuple2.getValue()).getKey(), ((KeyValPair) tuple2.getValue()).getValue());
                }
                Assert.assertEquals(14L, ((Long) hashMap2.get("a")).longValue());
                Assert.assertEquals(17L, ((Long) hashMap2.get("b")).longValue());
                return;
            case 2:
                Assert.assertEquals("There should be exactly four tuples for the time trigger", 4L, collectorTestSink.collectedTuples.size());
                for (Tuple tuple3 : collectorTestSink.collectedTuples) {
                    String str = (String) ((KeyValPair) tuple3.getValue()).getKey();
                    long longValue = ((Long) ((KeyValPair) tuple3.getValue()).getValue()).longValue();
                    hashMap2.put(longValue < 0 ? "R" + str : str, Long.valueOf(longValue));
                }
                Assert.assertEquals(-6L, ((Long) hashMap2.get("Ra")).longValue());
                Assert.assertEquals(-8L, ((Long) hashMap2.get("Rb")).longValue());
                Assert.assertEquals(14L, ((Long) hashMap2.get("a")).longValue());
                Assert.assertEquals(17L, ((Long) hashMap2.get("b")).longValue());
                return;
            case 3:
                Assert.assertEquals("There should be exactly two tuples for the time trigger", 2L, collectorTestSink.collectedTuples.size());
                for (Tuple tuple4 : collectorTestSink.collectedTuples) {
                    hashMap2.put(((KeyValPair) tuple4.getValue()).getKey(), ((KeyValPair) tuple4.getValue()).getValue());
                }
                Assert.assertEquals(8L, ((Long) hashMap2.get("a")).longValue());
                Assert.assertEquals(9L, ((Long) hashMap2.get("b")).longValue());
                return;
            default:
                throw new RuntimeException("Unknown accumulation mode: " + accumulationMode);
        }
    }

    @Test
    public void testKeyedTriggerWithDiscardingMode() {
        testKeyedTrigger(TriggerOption.AccumulationMode.DISCARDING);
    }

    @Test
    public void testKeyedTriggerWithAccumulatingMode() {
        testKeyedTrigger(TriggerOption.AccumulationMode.ACCUMULATING);
    }

    @Test
    public void testKeyedTriggerWithAccumulatingAndRetractingMode() {
        testKeyedTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING);
    }
}
