/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window.grouping;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.BinaryRowWriter;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.grouping.HeapWindowsGrouping;
import org.apache.flink.table.runtime.util.RowIterator;
import org.junit.Assert;
import org.junit.Test;

public class HeapWindowsGroupingTest {
    @Test
    public void testJumpingWindowCase() throws IOException {
        Long[] ts = new Long[]{1L, 2L, 3L, 4L, 7L, 8L, 11L, 13L, 81L, 93L};
        List<TimeWindow> expectedWindows = Arrays.asList(TimeWindow.of((long)0L, (long)3L), TimeWindow.of((long)10L, (long)13L), TimeWindow.of((long)80L, (long)83L));
        ArrayList<List<Long>> expected = new ArrayList<List<Long>>();
        expected.add(Arrays.asList(1L, 2L));
        expected.add(Collections.singletonList(11L));
        expected.add(Collections.singletonList(81L));
        this.verify(5000, ts, 3L, 10L, expected, expectedWindows);
    }

    @Test
    public void testNegativeCase() throws IOException {
        Long[] ts = new Long[]{-16L, -6L, -1L, 10L};
        List<TimeWindow> expectedWindows = Arrays.asList(TimeWindow.of((long)-20L, (long)-10L), TimeWindow.of((long)-10L, (long)0L), TimeWindow.of((long)10L, (long)20L));
        ArrayList<List<Long>> expected = new ArrayList<List<Long>>();
        expected.add(Collections.singletonList(-16L));
        expected.add(Arrays.asList(-6L, -1L));
        expected.add(Collections.singletonList(10L));
        this.verify(5000, ts, 10L, 10L, expected, expectedWindows);
        expectedWindows = Arrays.asList(TimeWindow.of((long)-20L, (long)-9L), TimeWindow.of((long)-10L, (long)1L), TimeWindow.of((long)0L, (long)11L), TimeWindow.of((long)10L, (long)21L));
        expected.clear();
        expected.add(Collections.singletonList(-16L));
        expected.add(Arrays.asList(-6L, -1L));
        expected.add(Collections.singletonList(10L));
        expected.add(Collections.singletonList(10L));
        this.verify(5000, ts, 11L, 10L, expected, expectedWindows);
    }

    @Test
    public void testNullValueCase() throws IOException {
        Long[] ts = new Long[]{null, 0L, 0L, 0L, 1L, 2L, 3L, 3L, 4L, 5L, 6L, 7L, null, 7L, 8L, 9L, 11L, 12L, 12L, 12L, null};
        List<TimeWindow> expectedWindows = Arrays.asList(TimeWindow.of((long)-2L, (long)2L), TimeWindow.of((long)0L, (long)4L), TimeWindow.of((long)2L, (long)6L), TimeWindow.of((long)4L, (long)8L), TimeWindow.of((long)6L, (long)10L), TimeWindow.of((long)8L, (long)12L), TimeWindow.of((long)10L, (long)14L), TimeWindow.of((long)12L, (long)16L));
        ArrayList<List<Long>> expected = new ArrayList<List<Long>>();
        expected.add(Arrays.asList(0L, 0L, 0L, 1L));
        expected.add(Arrays.asList(0L, 0L, 0L, 1L, 2L, 3L, 3L));
        expected.add(Arrays.asList(2L, 3L, 3L, 4L, 5L));
        expected.add(Arrays.asList(4L, 5L, 6L, 7L, 7L));
        expected.add(Arrays.asList(6L, 7L, 7L, 8L, 9L));
        expected.add(Arrays.asList(8L, 9L, 11L));
        expected.add(Arrays.asList(11L, 12L, 12L, 12L));
        expected.add(Arrays.asList(12L, 12L, 12L));
        this.verify(5000, ts, 4L, 2L, expected, expectedWindows);
        Long[] ts1 = new Long[]{null, null, 1L, 2L, 3L, 3L, 4L, 5L, 6L, 7L, null, 7L, 8L, 9L, 11L, 12L, 12L, 12L, null};
        expectedWindows = Arrays.asList(TimeWindow.of((long)-2L, (long)2L), TimeWindow.of((long)0L, (long)4L), TimeWindow.of((long)2L, (long)6L), TimeWindow.of((long)4L, (long)8L), TimeWindow.of((long)6L, (long)10L), TimeWindow.of((long)8L, (long)12L), TimeWindow.of((long)10L, (long)14L), TimeWindow.of((long)12L, (long)16L));
        expected.clear();
        expected.add(Collections.singletonList(1L));
        expected.add(Arrays.asList(1L, 2L, 3L, 3L));
        expected.add(Arrays.asList(2L, 3L, 3L, 4L, 5L));
        expected.add(Arrays.asList(4L, 5L, 6L, 7L, 7L));
        expected.add(Arrays.asList(6L, 7L, 7L, 8L, 9L));
        expected.add(Arrays.asList(8L, 9L, 11L));
        expected.add(Arrays.asList(11L, 12L, 12L, 12L));
        expected.add(Arrays.asList(12L, 12L, 12L));
        this.verify(5000, ts1, 4L, 2L, expected, expectedWindows);
    }

    @Test
    public void testFirstOverlappingWindow() throws IOException {
        Long[] ts = new Long[]{1L};
        List<TimeWindow> expectedWindows = Arrays.asList(TimeWindow.of((long)-3L, (long)2L), TimeWindow.of((long)-2L, (long)3L), TimeWindow.of((long)-1L, (long)4L), TimeWindow.of((long)0L, (long)5L), TimeWindow.of((long)1L, (long)6L));
        ArrayList<List<Long>> expected = new ArrayList<List<Long>>();
        expected.add(Collections.singletonList(1L));
        expected.add(Collections.singletonList(1L));
        expected.add(Collections.singletonList(1L));
        expected.add(Collections.singletonList(1L));
        expected.add(Collections.singletonList(1L));
        this.verify(5000, ts, 5L, 1L, expected, expectedWindows);
    }

    @Test
    public void testCommonCase() throws IOException {
        Long[] ts = new Long[]{0L, 0L, 0L, 1L, 2L, 3L, 3L, 4L, 5L, 6L, 7L, 7L, 8L, 9L, 11L, 12L, 12L, 12L};
        List<TimeWindow> expectedWindows = Arrays.asList(TimeWindow.of((long)-2L, (long)2L), TimeWindow.of((long)0L, (long)4L), TimeWindow.of((long)2L, (long)6L), TimeWindow.of((long)4L, (long)8L), TimeWindow.of((long)6L, (long)10L), TimeWindow.of((long)8L, (long)12L), TimeWindow.of((long)10L, (long)14L), TimeWindow.of((long)12L, (long)16L));
        ArrayList<List<Long>> expected = new ArrayList<List<Long>>();
        expected.add(Arrays.asList(0L, 0L, 0L, 1L));
        expected.add(Arrays.asList(0L, 0L, 0L, 1L, 2L, 3L, 3L));
        expected.add(Arrays.asList(2L, 3L, 3L, 4L, 5L));
        expected.add(Arrays.asList(4L, 5L, 6L, 7L, 7L));
        expected.add(Arrays.asList(6L, 7L, 7L, 8L, 9L));
        expected.add(Arrays.asList(8L, 9L, 11L));
        expected.add(Arrays.asList(11L, 12L, 12L, 12L));
        expected.add(Arrays.asList(12L, 12L, 12L));
        this.verify(5000, ts, 4L, 2L, expected, expectedWindows);
        expected.clear();
        expectedWindows = Arrays.asList(TimeWindow.of((long)-3L, (long)1L), TimeWindow.of((long)0L, (long)4L), TimeWindow.of((long)3L, (long)7L), TimeWindow.of((long)6L, (long)10L), TimeWindow.of((long)9L, (long)13L), TimeWindow.of((long)12L, (long)16L));
        expected.add(Arrays.asList(0L, 0L, 0L));
        expected.add(Arrays.asList(0L, 0L, 0L, 1L, 2L, 3L, 3L));
        expected.add(Arrays.asList(3L, 3L, 4L, 5L, 6L));
        expected.add(Arrays.asList(6L, 7L, 7L, 8L, 9L));
        expected.add(Arrays.asList(9L, 11L, 12L, 12L, 12L));
        expected.add(Arrays.asList(12L, 12L, 12L));
        this.verify(5000, ts, 4L, 3L, expected, expectedWindows);
        expected.clear();
        expectedWindows = Arrays.asList(TimeWindow.of((long)0L, (long)5L), TimeWindow.of((long)5L, (long)10L), TimeWindow.of((long)10L, (long)15L));
        expected.add(Arrays.asList(0L, 0L, 0L, 1L, 2L, 3L, 3L, 4L));
        expected.add(Arrays.asList(5L, 6L, 7L, 7L, 8L, 9L));
        expected.add(Arrays.asList(11L, 12L, 12L, 12L));
        this.verify(5000, ts, 5L, 5L, expected, expectedWindows);
    }

    @Test
    public void testSparseCase() throws IOException {
        Long[] ts = new Long[]{0L, 7L, 33L, 76L, 77L, 77L, 98L, 99L, 100L, 999L};
        List<TimeWindow> expectedWindows = Arrays.asList(TimeWindow.of((long)-2L, (long)2L), TimeWindow.of((long)0L, (long)4L), TimeWindow.of((long)4L, (long)8L), TimeWindow.of((long)6L, (long)10L), TimeWindow.of((long)30L, (long)34L), TimeWindow.of((long)32L, (long)36L), TimeWindow.of((long)74L, (long)78L), TimeWindow.of((long)76L, (long)80L), TimeWindow.of((long)96L, (long)100L), TimeWindow.of((long)98L, (long)102L), TimeWindow.of((long)100L, (long)104L), TimeWindow.of((long)996L, (long)1000L), TimeWindow.of((long)998L, (long)1002L));
        ArrayList<List<Long>> expected = new ArrayList<List<Long>>();
        expected.add(Collections.singletonList(0L));
        expected.add(Collections.singletonList(0L));
        expected.add(Collections.singletonList(7L));
        expected.add(Collections.singletonList(7L));
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        expected.add(Arrays.asList(76L, 77L, 77L));
        expected.add(Arrays.asList(76L, 77L, 77L));
        expected.add(Arrays.asList(98L, 99L));
        expected.add(Arrays.asList(98L, 99L, 100L));
        expected.add(Collections.singletonList(100L));
        expected.add(Collections.singletonList(999L));
        expected.add(Collections.singletonList(999L));
        this.verify(5000, ts, 4L, 2L, expected, expectedWindows);
        expectedWindows = Arrays.asList(TimeWindow.of((long)-6L, (long)3L), TimeWindow.of((long)-3L, (long)6L), TimeWindow.of((long)0L, (long)9L), TimeWindow.of((long)3L, (long)12L), TimeWindow.of((long)6L, (long)15L), TimeWindow.of((long)27L, (long)36L), TimeWindow.of((long)30L, (long)39L), TimeWindow.of((long)33L, (long)42L), TimeWindow.of((long)69L, (long)78L), TimeWindow.of((long)72L, (long)81L), TimeWindow.of((long)75L, (long)84L), TimeWindow.of((long)90L, (long)99L), TimeWindow.of((long)93L, (long)102L), TimeWindow.of((long)96L, (long)105L), TimeWindow.of((long)99L, (long)108L), TimeWindow.of((long)993L, (long)1002L), TimeWindow.of((long)996L, (long)1005L), TimeWindow.of((long)999L, (long)1008L));
        expected = new ArrayList();
        expected.add(Collections.singletonList(0L));
        expected.add(Collections.singletonList(0L));
        expected.add(Arrays.asList(0L, 7L));
        expected.add(Collections.singletonList(7L));
        expected.add(Collections.singletonList(7L));
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        expected.add(Arrays.asList(76L, 77L, 77L));
        expected.add(Arrays.asList(76L, 77L, 77L));
        expected.add(Arrays.asList(76L, 77L, 77L));
        expected.add(Collections.singletonList(98L));
        expected.add(Arrays.asList(98L, 99L, 100L));
        expected.add(Arrays.asList(98L, 99L, 100L));
        expected.add(Arrays.asList(99L, 100L));
        expected.add(Collections.singletonList(999L));
        expected.add(Collections.singletonList(999L));
        expected.add(Collections.singletonList(999L));
        this.verify(5000, ts, 9L, 3L, expected, expectedWindows);
    }

    @Test
    public void testSingleInputCase() throws IOException {
        Long[] ts = new Long[]{33L};
        List<TimeWindow> expectedWindows = Arrays.asList(TimeWindow.of((long)30L, (long)34L), TimeWindow.of((long)32L, (long)36L));
        ArrayList<List<Long>> expected = new ArrayList<List<Long>>();
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        this.verify(5000, ts, 4L, 2L, expected, expectedWindows);
        expectedWindows = Arrays.asList(TimeWindow.of((long)15L, (long)34L), TimeWindow.of((long)18L, (long)37L), TimeWindow.of((long)21L, (long)40L), TimeWindow.of((long)24L, (long)43L), TimeWindow.of((long)27L, (long)46L), TimeWindow.of((long)30L, (long)49L), TimeWindow.of((long)33L, (long)52L));
        expected.clear();
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        expected.add(Collections.singletonList(33L));
        this.verify(5000, ts, 19L, 3L, expected, expectedWindows);
    }

    @Test(expected=IOException.class)
    public void testOOM() throws IOException {
        Long[] ts = new Long[]{33L, 33L, 33L, 33L, 33L, 33L};
        this.verify(5, ts, 4L, 2L, new ArrayList<List<Long>>(), new ArrayList<TimeWindow>());
    }

    @Test
    public void testAdvanceWatermarkFirst() throws IOException {
        Long[] ts = new Long[]{16L};
        List<TimeWindow> expectedWindows = Arrays.asList(TimeWindow.of((long)12L, (long)20L), TimeWindow.of((long)16L, (long)24L));
        ArrayList<List<Long>> expected = new ArrayList<List<Long>>();
        expected.add(Collections.singletonList(16L));
        expected.add(Collections.singletonList(16L));
        ArrayList<List<Long>> actual = new ArrayList<List<Long>>();
        ArrayList<TimeWindow> windows = new ArrayList<TimeWindow>();
        HeapWindowsGrouping grouping = new HeapWindowsGrouping(5000, 0L, 8L, 4L, 0, false);
        TestInputIterator iterator = new TestInputIterator(ts);
        grouping.addInputToBuffer((BinaryRow)iterator.getRow());
        grouping.advanceWatermarkToTriggerAllWindows();
        this.processTriggerWindow(actual, windows, grouping);
        Assert.assertEquals(expectedWindows, windows);
        Assert.assertEquals(expected, actual);
    }

    @Test(expected=IllegalStateException.class)
    public void testInvalidWindowTrigger() throws IOException {
        Long[] ts = new Long[]{8L};
        TestInputIterator iterator = new TestInputIterator(ts);
        HeapWindowsGrouping grouping = new HeapWindowsGrouping(5000, 0L, 8L, 4L, 0, false);
        grouping.addInputToBuffer((BinaryRow)iterator.getRow());
        System.out.println("valid window trigger");
        RowIterator iter = grouping.buildTriggerWindowElementsIterator();
        TimeWindow window = grouping.getTriggerWindow();
        ArrayList<Long> buffer = new ArrayList<Long>();
        while (iter.advanceNext()) {
            buffer.add(((BinaryRow)iter.getRow()).getLong(0));
        }
        Assert.assertEquals((Object)TimeWindow.of((long)0L, (long)8L), (Object)window);
        Assert.assertEquals(Collections.emptyList(), buffer);
        System.out.println("try invalid window trigger");
        grouping.buildTriggerWindowElementsIterator();
    }

    private void verify(int limit, Long[] ts, long windowSize, long slideSize, List<List<Long>> expected, List<TimeWindow> expectedWindows) throws IOException {
        ArrayList<List<Long>> actual = new ArrayList<List<Long>>();
        ArrayList<TimeWindow> windows = new ArrayList<TimeWindow>();
        HeapWindowsGrouping grouping = new HeapWindowsGrouping(limit, 0L, windowSize, slideSize, 0, false);
        TestInputIterator iterator = new TestInputIterator(ts);
        while (iterator.advanceNext()) {
            BinaryRow input = (BinaryRow)iterator.getRow();
            grouping.addInputToBuffer(input);
            this.processTriggerWindow(actual, windows, grouping);
        }
        grouping.advanceWatermarkToTriggerAllWindows();
        this.processTriggerWindow(actual, windows, grouping);
        Assert.assertEquals(expectedWindows, windows);
        Assert.assertEquals(expected, actual);
    }

    private void processTriggerWindow(List<List<Long>> actual, List<TimeWindow> windows, HeapWindowsGrouping grouping) {
        while (grouping.hasTriggerWindow()) {
            RowIterator iter = grouping.buildTriggerWindowElementsIterator();
            TimeWindow window = grouping.getTriggerWindow();
            ArrayList<Long> buffer = new ArrayList<Long>();
            while (iter.advanceNext()) {
                buffer.add(((BinaryRow)iter.getRow()).getLong(0));
            }
            if (buffer.isEmpty()) continue;
            actual.add(buffer);
            windows.add(window);
        }
    }

    private class TestInputIterator
    implements RowIterator<BinaryRow> {
        private BinaryRow row = new BinaryRow(1);
        private BinaryRowWriter writer = new BinaryRowWriter(this.row);
        private List<Long> assignedWindowStart;
        private int count;

        TestInputIterator(Long[] assignedWindowStart) {
            this.assignedWindowStart = Arrays.asList(assignedWindowStart);
            this.assignedWindowStart.sort((o1, o2) -> {
                if (o1 == null && o2 == null) {
                    return 0;
                }
                if (o1 == null) {
                    return -1;
                }
                if (o2 == null) {
                    return 1;
                }
                return (int)(o1 - o2);
            });
            this.count = 0;
        }

        public boolean advanceNext() {
            return this.count < this.assignedWindowStart.size();
        }

        public BinaryRow getRow() {
            this.writer.reset();
            if (this.assignedWindowStart.get(this.count) == null) {
                this.writer.setNullAt(0);
            } else {
                this.writer.writeLong(0, this.assignedWindowStart.get(this.count).longValue());
            }
            this.writer.complete();
            ++this.count;
            return this.row;
        }
    }
}

