package org.apache.apex.malhar.lib.window.impl;

import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.KeyValPair;
import com.google.common.base.Function;
import java.util.List;
import java.util.Set;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.accumulation.CoGroup;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/apex/malhar/lib/window/impl/WindowedMergeOperatorTest.class */
public class WindowedMergeOperatorTest {
    @Test
    public void extractTimestampTest() {
        WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultWindowedMergeOperator = createDefaultWindowedMergeOperator();
        Function<Integer, Long> function = new Function<Integer, Long>() { // from class: org.apache.apex.malhar.lib.window.impl.WindowedMergeOperatorTest.1
            public Long apply(Integer num) {
                return Long.valueOf(num.intValue() * 10);
            }
        };
        Assert.assertEquals(1000L, createDefaultWindowedMergeOperator.extractTimestamp(new Tuple.PlainTuple(100), function));
        Assert.assertEquals(2000L, createDefaultWindowedMergeOperator.extractTimestamp(new Tuple.PlainTuple(200), function));
        Assert.assertEquals(200L, createDefaultWindowedMergeOperator.extractTimestamp(new Tuple.TimestampedTuple(200L, 10), (Function) null));
    }

    @Test
    public void windowedMergeOperatorMergeTest() {
        WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultWindowedMergeOperator = createDefaultWindowedMergeOperator();
        Window.GlobalWindow globalWindow = Window.GlobalWindow.INSTANCE;
        createDefaultWindowedMergeOperator.setDataStorage(new InMemoryWindowedStorage());
        createDefaultWindowedMergeOperator.setWindowOption(new WindowOption.GlobalWindow());
        createDefaultWindowedMergeOperator.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET);
        createDefaultWindowedMergeOperator.processTuple(new Tuple.WindowedTuple(globalWindow, 100));
        Assert.assertEquals(1L, ((Set) ((List) createDefaultWindowedMergeOperator.dataStorage.get(globalWindow)).get(0)).size());
        createDefaultWindowedMergeOperator.processTuple2(new Tuple.WindowedTuple(globalWindow, 200));
        Assert.assertEquals(1L, ((Set) ((List) createDefaultWindowedMergeOperator.dataStorage.get(globalWindow)).get(1)).size());
        createDefaultWindowedMergeOperator.processTuple(new Tuple.WindowedTuple(globalWindow, 300));
        Assert.assertEquals(2L, ((Set) ((List) createDefaultWindowedMergeOperator.dataStorage.get(globalWindow)).get(0)).size());
        Assert.assertEquals(2L, ((List) createDefaultWindowedMergeOperator.accumulation.getOutput(createDefaultWindowedMergeOperator.dataStorage.get(globalWindow))).size());
    }

    @Test
    public void keyedWindowedMergeOperatorMergeTest() {
        KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultKeyedWindowedMergeOperator = createDefaultKeyedWindowedMergeOperator();
        Window.GlobalWindow globalWindow = Window.GlobalWindow.INSTANCE;
        createDefaultKeyedWindowedMergeOperator.setDataStorage(new InMemoryWindowedKeyedStorage());
        createDefaultKeyedWindowedMergeOperator.setWindowOption(new WindowOption.GlobalWindow());
        createDefaultKeyedWindowedMergeOperator.initializeWindowStates(AbstractWindowedOperator.GLOBAL_WINDOW_SINGLETON_SET);
        createDefaultKeyedWindowedMergeOperator.processTuple(new Tuple.WindowedTuple(globalWindow, new KeyValPair("A", 100)));
        Assert.assertEquals(1L, ((Set) ((List) createDefaultKeyedWindowedMergeOperator.dataStorage.get(globalWindow, "A")).get(0)).size());
        Assert.assertTrue(((Set) ((List) createDefaultKeyedWindowedMergeOperator.dataStorage.get(globalWindow, "A")).get(0)).contains(100));
        createDefaultKeyedWindowedMergeOperator.processTuple2(new Tuple.WindowedTuple(globalWindow, new KeyValPair("A", 200)));
        Assert.assertEquals(1L, ((Set) ((List) createDefaultKeyedWindowedMergeOperator.dataStorage.get(globalWindow, "A")).get(1)).size());
        Assert.assertTrue(((Set) ((List) createDefaultKeyedWindowedMergeOperator.dataStorage.get(globalWindow, "A")).get(1)).contains(200));
        createDefaultKeyedWindowedMergeOperator.processTuple2(new Tuple.WindowedTuple(globalWindow, new KeyValPair("B", 300)));
        Assert.assertEquals(1L, ((Set) ((List) createDefaultKeyedWindowedMergeOperator.dataStorage.get(globalWindow, "A")).get(1)).size());
        Assert.assertEquals(1L, ((Set) ((List) createDefaultKeyedWindowedMergeOperator.dataStorage.get(globalWindow, "B")).get(1)).size());
        Assert.assertTrue(((Set) ((List) createDefaultKeyedWindowedMergeOperator.dataStorage.get(globalWindow, "B")).get(1)).contains(300));
        Assert.assertEquals(2L, ((List) createDefaultKeyedWindowedMergeOperator.accumulation.getOutput(createDefaultKeyedWindowedMergeOperator.dataStorage.get(globalWindow, "A"))).size());
    }

    @Test
    public void windowedMergeOperatorWatermarkTest() {
        WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultWindowedMergeOperator = createDefaultWindowedMergeOperator();
        createDefaultWindowedMergeOperator.controlOutput.setSink(new CollectorTestSink());
        createDefaultWindowedMergeOperator.controlInput.process(new WatermarkImpl(1000000L));
        createDefaultWindowedMergeOperator.endWindow();
        Assert.assertEquals(-1L, createDefaultWindowedMergeOperator.currentWatermark);
        Assert.assertEquals(0L, r0.collectedTuples.size());
        createDefaultWindowedMergeOperator.controlInput2.process(new WatermarkImpl(200000L));
        createDefaultWindowedMergeOperator.endWindow();
        Assert.assertEquals(200000L, createDefaultWindowedMergeOperator.currentWatermark);
        Assert.assertEquals(1L, r0.collectedTuples.size());
        createDefaultWindowedMergeOperator.controlInput2.process(new WatermarkImpl(2100000L));
        createDefaultWindowedMergeOperator.endWindow();
        Assert.assertEquals(1000000L, createDefaultWindowedMergeOperator.currentWatermark);
        Assert.assertEquals(2L, r0.collectedTuples.size());
        createDefaultWindowedMergeOperator.controlInput.process(new WatermarkImpl(1100000L));
        createDefaultWindowedMergeOperator.endWindow();
        Assert.assertEquals(1100000L, createDefaultWindowedMergeOperator.currentWatermark);
        Assert.assertEquals(3L, r0.collectedTuples.size());
        createDefaultWindowedMergeOperator.controlInput.process(new WatermarkImpl(1100000L));
        createDefaultWindowedMergeOperator.endWindow();
        Assert.assertEquals(1100000L, createDefaultWindowedMergeOperator.currentWatermark);
        Assert.assertEquals(3L, r0.collectedTuples.size());
    }

    private WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultWindowedMergeOperator() {
        WindowedMergeOperatorImpl<Integer, Integer, List<Set<Integer>>, List<List<Integer>>> windowedMergeOperatorImpl = new WindowedMergeOperatorImpl<>();
        windowedMergeOperatorImpl.setDataStorage(new InMemoryWindowedStorage());
        windowedMergeOperatorImpl.setRetractionStorage(new InMemoryWindowedStorage());
        windowedMergeOperatorImpl.setWindowStateStorage(new InMemoryWindowedStorage());
        windowedMergeOperatorImpl.setAccumulation(new CoGroup());
        return windowedMergeOperatorImpl;
    }

    private KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> createDefaultKeyedWindowedMergeOperator() {
        KeyedWindowedMergeOperatorImpl<String, Integer, Integer, List<Set<Integer>>, List<List<Integer>>> keyedWindowedMergeOperatorImpl = new KeyedWindowedMergeOperatorImpl<>();
        keyedWindowedMergeOperatorImpl.setDataStorage(new InMemoryWindowedKeyedStorage());
        keyedWindowedMergeOperatorImpl.setRetractionStorage(new InMemoryWindowedKeyedStorage());
        keyedWindowedMergeOperatorImpl.setWindowStateStorage(new InMemoryWindowedStorage());
        keyedWindowedMergeOperatorImpl.setAccumulation(new CoGroup());
        return keyedWindowedMergeOperatorImpl;
    }
}
