/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.window;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.WindowTableFunctionOperator;
import org.apache.flink.table.runtime.operators.window.assigners.CumulativeWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class WindowTableFunctionOperatorTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private final ZoneId shiftTimeZone;
    private static final RowType INPUT_ROW_TYPE = new RowType(Arrays.asList(new RowType.RowField("f0", (LogicalType)new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", (LogicalType)new IntType()), new RowType.RowField("f2", (LogicalType)new TimestampType(3))));
    private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
    private static final int ROW_TIME_INDEX = 2;
    private static final LogicalType[] OUTPUT_TYPES = new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new IntType(), new TimestampType(3), new TimestampType(3), new TimestampType(3), new TimestampType(3)};
    private static final TypeSerializer<RowData> OUT_SERIALIZER = new RowDataSerializer(OUTPUT_TYPES);
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_TYPES, new GenericRowRecordSortComparator(4, (LogicalType)new TimestampType()));

    public WindowTableFunctionOperatorTest(ZoneId shiftTimeZone) {
        this.shiftTimeZone = shiftTimeZone;
    }

    @Parameterized.Parameters(name="TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID}, {SHANGHAI_ZONE_ID});
    }

    @Test
    public void testTumblingWindows() throws Exception {
        TumblingWindowAssigner assigner = TumblingWindowAssigner.of((Duration)Duration.ofSeconds(3L));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((WindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(this.insertRecord("key1", 1, 20L));
        testHarness.processElement(this.insertRecord("key2", 1, 3999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(this.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(this.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(6000L), 5999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(this.insertRecord("key2", 1, 80L));
        expectedOutput.add(this.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(3000L), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testHopWindows() throws Exception {
        SlidingWindowAssigner assigner = SlidingWindowAssigner.of((Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((WindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(this.insertRecord("key1", 1, 20L));
        testHarness.processElement(this.insertRecord("key2", 1, 3999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(this.insertRecord("key1", 1, 20L, this.localMills(-2000L), this.localMills(1000L), 999L));
        expectedOutput.add(this.insertRecord("key1", 1, 20L, this.localMills(-1000L), this.localMills(2000L), 1999L));
        expectedOutput.add(this.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(this.insertRecord("key2", 1, 3999L, this.localMills(1000L), this.localMills(4000L), 3999L));
        expectedOutput.add(this.insertRecord("key2", 1, 3999L, this.localMills(2000L), this.localMills(5000L), 4999L));
        expectedOutput.add(this.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(6000L), 5999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(this.insertRecord("key2", 1, 80L));
        expectedOutput.add(this.insertRecord("key2", 1, 80L, this.localMills(-2000L), this.localMills(1000L), 999L));
        expectedOutput.add(this.insertRecord("key2", 1, 80L, this.localMills(-1000L), this.localMills(2000L), 1999L));
        expectedOutput.add(this.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(3000L), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testCumulativeWindows() throws Exception {
        CumulativeWindowAssigner assigner = CumulativeWindowAssigner.of((Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((WindowAssigner<TimeWindow>)assigner, this.shiftTimeZone);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(this.insertRecord("key1", 1, 20L));
        testHarness.processElement(this.insertRecord("key2", 1, 3999L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(this.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(1000L), 999L));
        expectedOutput.add(this.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(2000L), 1999L));
        expectedOutput.add(this.insertRecord("key1", 1, 20L, this.localMills(0L), this.localMills(3000L), 2999L));
        expectedOutput.add(this.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(4000L), 3999L));
        expectedOutput.add(this.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(5000L), 4999L));
        expectedOutput.add(this.insertRecord("key2", 1, 3999L, this.localMills(3000L), this.localMills(6000L), 5999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(this.insertRecord("key2", 1, 80L));
        expectedOutput.add(this.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(1000L), 999L));
        expectedOutput.add(this.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(2000L), 1999L));
        expectedOutput.add(this.insertRecord("key2", 1, 80L, this.localMills(0L), this.localMills(3000L), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(WindowAssigner<TimeWindow> windowAssigner, ZoneId shiftTimeZone) throws Exception {
        WindowTableFunctionOperator operator = new WindowTableFunctionOperator(windowAssigner, 2, shiftTimeZone);
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (TypeSerializer)INPUT_ROW_SER);
    }

    private StreamRecord<RowData> insertRecord(String f0, int f1, Long ... f2) {
        Object[] fields = new Object[2 + f2.length];
        fields[0] = f0;
        fields[1] = f1;
        for (int idx = 0; idx < f2.length; ++idx) {
            fields[2 + idx] = TimestampData.fromEpochMillis((long)f2[idx]);
        }
        return new StreamRecord((Object)StreamRecordUtils.row(fields));
    }

    private long localMills(long epochMills) {
        return TimeWindowUtil.toUtcTimestampMills((long)epochMills, (ZoneId)this.shiftTimeZone);
    }
}

