package org.apache.flink.table.runtime.operators.window;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.runtime.operators.window.assigners.CumulativeWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperatorTest.class */
public class WindowTableFunctionOperatorTest {
    private final ZoneId shiftTimeZone;
    private static final int ROW_TIME_INDEX = 2;
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private static final RowType INPUT_ROW_TYPE = new RowType(Arrays.asList(new RowType.RowField("f0", new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", new IntType()), new RowType.RowField("f2", new TimestampType(3))));
    private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
    private static final LogicalType[] OUTPUT_TYPES = {new VarCharType(Integer.MAX_VALUE), new IntType(), new TimestampType(3), new TimestampType(3), new TimestampType(3), new TimestampType(3)};
    private static final TypeSerializer<RowData> OUT_SERIALIZER = new RowDataSerializer(OUTPUT_TYPES);
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_TYPES, new GenericRowRecordSortComparator(4, new TimestampType()));

    public WindowTableFunctionOperatorTest(ZoneId zoneId) {
        this.shiftTimeZone = zoneId;
    }

    @Parameterized.Parameters(name = "TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList(new Object[]{UTC_ZONE_ID}, new Object[]{SHANGHAI_ZONE_ID});
    }

    @Test
    public void testTumblingWindows() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(TumblingWindowAssigner.of(Duration.ofSeconds(3L)), this.shiftTimeZone);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(insertRecord("key1", 1, 20L));
        createTestHarness.processElement(insertRecord("key2", 1, 3999L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, 20L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 3999L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L)), 5999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(insertRecord("key2", 1, 80L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 80L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(TumblingWindowAssigner.of(Duration.ofSeconds(3L)).withProcessingTime(), this.shiftTimeZone);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.setProcessingTime(20L);
        createTestHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
        createTestHarness.setProcessingTime(3999L);
        createTestHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
        concurrentLinkedQueue.add(insertRecord("key1", 1, Long.MAX_VALUE, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, Long.MAX_VALUE, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L)), 5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
    }

    @Test
    public void testHopWindows() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(SlidingWindowAssigner.of(Duration.ofSeconds(3L), Duration.ofSeconds(1L)), this.shiftTimeZone);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(insertRecord("key1", 1, 20L));
        createTestHarness.processElement(insertRecord("key2", 1, 3999L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, 20L, Long.valueOf(localMills(-2000L)), Long.valueOf(localMills(1000L)), 999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, 20L, Long.valueOf(localMills(-1000L)), Long.valueOf(localMills(2000L)), 1999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, 20L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 3999L, Long.valueOf(localMills(1000L)), Long.valueOf(localMills(4000L)), 3999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 3999L, Long.valueOf(localMills(2000L)), Long.valueOf(localMills(5000L)), 4999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 3999L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L)), 5999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(insertRecord("key2", 1, 80L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 80L, Long.valueOf(localMills(-2000L)), Long.valueOf(localMills(1000L)), 999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 80L, Long.valueOf(localMills(-1000L)), Long.valueOf(localMills(2000L)), 1999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 80L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
    }

    @Test
    public void testProcessingTimeHopWindows() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(SlidingWindowAssigner.of(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withProcessingTime(), this.shiftTimeZone);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.setProcessingTime(20L);
        createTestHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
        createTestHarness.setProcessingTime(3999L);
        createTestHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
        concurrentLinkedQueue.add(insertRecord("key1", 1, Long.MAX_VALUE, Long.valueOf(localMills(-2000L)), Long.valueOf(localMills(1000L)), 999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, Long.MAX_VALUE, Long.valueOf(localMills(-1000L)), Long.valueOf(localMills(2000L)), 1999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, Long.MAX_VALUE, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, Long.MAX_VALUE, Long.valueOf(localMills(1000L)), Long.valueOf(localMills(4000L)), 3999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, Long.MAX_VALUE, Long.valueOf(localMills(2000L)), Long.valueOf(localMills(5000L)), 4999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, Long.MAX_VALUE, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L)), 5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
    }

    @Test
    public void testCumulativeWindows() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(CumulativeWindowAssigner.of(Duration.ofSeconds(3L), Duration.ofSeconds(1L)), this.shiftTimeZone);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.processElement(insertRecord("key1", 1, 20L));
        createTestHarness.processElement(insertRecord("key2", 1, 3999L));
        createTestHarness.processWatermark(new Watermark(999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, 20L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(1000L)), 999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, 20L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(2000L)), 1999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, 20L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 3999L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(4000L)), 3999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 3999L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(5000L)), 4999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 3999L, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L)), 5999L));
        concurrentLinkedQueue.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
        createTestHarness.processElement(insertRecord("key2", 1, 80L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 80L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(1000L)), 999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 80L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(2000L)), 1999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, 80L, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
    }

    @Test
    public void testProcessingCumulativeWindows() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness = createTestHarness(CumulativeWindowAssigner.of(Duration.ofSeconds(3L), Duration.ofSeconds(1L)).withProcessingTime(), this.shiftTimeZone);
        createTestHarness.setup(OUT_SERIALIZER);
        createTestHarness.open();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        createTestHarness.setProcessingTime(20L);
        createTestHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
        createTestHarness.setProcessingTime(3999L);
        createTestHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
        concurrentLinkedQueue.add(insertRecord("key1", 1, Long.MAX_VALUE, Long.valueOf(localMills(0L)), Long.valueOf(localMills(1000L)), 999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, Long.MAX_VALUE, Long.valueOf(localMills(0L)), Long.valueOf(localMills(2000L)), 1999L));
        concurrentLinkedQueue.add(insertRecord("key1", 1, Long.MAX_VALUE, Long.valueOf(localMills(0L)), Long.valueOf(localMills(3000L)), 2999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, Long.MAX_VALUE, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(4000L)), 3999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, Long.MAX_VALUE, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(5000L)), 4999L));
        concurrentLinkedQueue.add(insertRecord("key2", 1, Long.MAX_VALUE, Long.valueOf(localMills(3000L)), Long.valueOf(localMills(6000L)), 5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", concurrentLinkedQueue, createTestHarness.getOutput());
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(WindowAssigner<TimeWindow> windowAssigner, ZoneId zoneId) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperator) new WindowTableFunctionOperator(windowAssigner, 2, zoneId), (TypeSerializer) INPUT_ROW_SER);
    }

    private StreamRecord<RowData> insertRecord(String str, int i, Long... lArr) {
        Object[] objArr = new Object[2 + lArr.length];
        objArr[0] = str;
        objArr[1] = Integer.valueOf(i);
        for (int i2 = 0; i2 < lArr.length; i2++) {
            objArr[2 + i2] = TimestampData.fromEpochMillis(lArr[i2].longValue());
        }
        return new StreamRecord<>(StreamRecordUtils.row(objArr));
    }

    private long localMills(long j) {
        return TimeWindowUtil.toUtcTimestampMills(j, this.shiftTimeZone);
    }
}
