/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank;

import java.util.ArrayList;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
import org.apache.flink.table.runtime.operators.rank.TopNFunctionTestBase;
import org.apache.flink.table.runtime.operators.rank.VariableRankRange;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;

public class RetractableTopNFunctionTest
extends TopNFunctionTestBase {
    @Override
    protected AbstractTopNFunction createFunction(RankType rankType, RankRange rankRange, boolean generateUpdateBefore, boolean outputRankNumber) {
        return new RetractableTopNFunction(this.ttlConfig, this.inputRowType, comparableRecordComparator, this.sortKeySelector, rankType, rankRange, generatedEqualiser, generateUpdateBefore, outputRankNumber);
    }

    @Test
    public void testProcessRetractMessageWithNotGenerateUpdateBefore() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), false, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 4L, 11));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 5L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 4L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 3L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 4L, 11, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 5L, 11, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 4L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 3L, 44, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 5L, 22, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 4L, 33, 2L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testProcessRetractMessageWithGenerateUpdateBefore() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 4L, 11));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 5L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 4L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 3L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 4L, 11, 1L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12, 2L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 12, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 5L, 11, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 4L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 3L, 44, 2L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("fruit", 4L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 5L, 22, 1L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("fruit", 3L, 44, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 4L, 33, 2L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testConstantRankRangeWithoutOffsetWithRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 4L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 4L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 3L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 4L, 11, 1L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 4L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 3L, 44, 2L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("fruit", 4L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 5L, 22, 1L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("fruit", 3L, 44, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 4L, 33, 2L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), true, true);
        testHarness = this.createTestHarness(func);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 4L, 11, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 10, 1L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 1L, 12, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 4L, 11, 2L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testConstantRankRangeWithoutOffsetWithoutRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), true, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 4L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 4L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 3L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.deleteRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 4L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 4L, 33));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 3L, 44));
        expectedOutput.add(StreamRecordUtils.deleteRecord("fruit", 3L, 44));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        this.assertorWithoutRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), true, false);
        testHarness = this.createTestHarness(func);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 10));
        expectedOutput.add(StreamRecordUtils.deleteRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 10));
        this.assertorWithoutRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testVariableRankRangeWithRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new VariableRankRange(1), true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 2L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 11, 1L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 2L, 12, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("fruit", 1L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 1L, 22, 1L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testVariableRankRangeWithoutRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new VariableRankRange(1), true, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.deleteRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        expectedOutput.add(StreamRecordUtils.deleteRecord("fruit", 1L, 33));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        this.assertorWithoutRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testDisableGenerateUpdateBeforeWithRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), false, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 4L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 4L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 3L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 4L, 11, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("book", 1L, 12, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 4L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 3L, 44, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 5L, 22, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 4L, 33, 2L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testDisableGenerateUpdateBeforeWithoutRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), false, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 4L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 4L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 3L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.deleteRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 4L, 11));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 4L, 33));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 3L, 44));
        expectedOutput.add(StreamRecordUtils.deleteRecord("fruit", 3L, 44));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        this.assertorWithoutRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testCleanIdleState() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.setStateTtlProcessingTime(0L);
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 5L, 22));
        testHarness.setStateTtlProcessingTime(9000000L);
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("book", 1L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 4L, 11));
        testHarness.setStateTtlProcessingTime(20000000L);
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 8L, 100));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 1L, 12));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 5L, 22, 1L));
        expectedOutput.add(StreamRecordUtils.deleteRecord("book", 1L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.updateBeforeRecord("fruit", 5L, 22, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("fruit", 4L, 11, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 5L, 22, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 8L, 100, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 1L, 12, 1L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testConstantRankRangeWithoutRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 3L), false, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 1));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 2L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 3L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 4L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 5L, 3));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 6L, 4));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("a", 2L, 2));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 1));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 2L, 2));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 3L, 2));
        expectedOutput.add(StreamRecordUtils.deleteRecord("a", 2L, 2));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 4L, 2));
        this.assertorWithoutRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testConstantRankRangeWithRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 3L), false, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 1));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 2L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 3L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 4L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 5L, 3));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 6L, 4));
        testHarness.processElement(StreamRecordUtils.updateBeforeRecord("a", 2L, 2));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 1, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 2L, 2, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 3L, 2, 3L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("a", 3L, 2, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("a", 4L, 2, 3L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testRetractRecordOutOfRankRangeWithoutRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), false, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 1));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 2L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 3L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 4L, 4));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 5L, 4));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 4L, 4));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 1L, 1));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 2L, 2));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 1));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 2L, 2));
        expectedOutput.add(StreamRecordUtils.deleteRecord("a", 1L, 1));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 3L, 2));
        expectedOutput.add(StreamRecordUtils.deleteRecord("a", 2L, 2));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 5L, 4));
        this.assertorWithoutRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testRetractRecordOutOfRankRangeWithRowNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), false, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 1));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 2L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 3L, 2));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 4L, 4));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 5L, 4));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 4L, 4));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 1L, 1));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 2L, 2));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 1, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 2L, 2, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("a", 2L, 2, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("a", 3L, 2, 2L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("a", 3L, 2, 1L));
        expectedOutput.add(StreamRecordUtils.updateAfterRecord("a", 5L, 4, 2L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testRetractAndThenDeleteRecordWithoutRowNumber() throws Exception {
        RetractableTopNFunction func = new RetractableTopNFunction(this.ttlConfig, InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new BigIntType(), new IntType(), new IntType()}), comparableRecordComparator, this.sortKeySelector, RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 1L), generatedEqualiser, true, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((AbstractTopNFunction)func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 10, 0));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 9, 0));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 1L, 10, 0));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 1L, 9, 0));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 10, 1));
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 9, 1));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 10, 0));
        expectedOutput.add(StreamRecordUtils.deleteRecord("a", 1L, 10, 0));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 9, 0));
        expectedOutput.add(StreamRecordUtils.deleteRecord("a", 1L, 9, 0));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 10, 1));
        expectedOutput.add(StreamRecordUtils.deleteRecord("a", 1L, 10, 1));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 9, 1));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testRetractAnStaledRecordWithRowNumber() throws Exception {
        StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig((long)1000L);
        RetractableTopNFunction func = new RetractableTopNFunction(ttlConfig, InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new VarCharType(), new BigIntType(), new IntType()}), comparableRecordComparator, this.sortKeySelector, RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), generatedEqualiser, true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((AbstractTopNFunction)func);
        testHarness.open();
        testHarness.setStateTtlProcessingTime(0L);
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 1L, 10));
        testHarness.setStateTtlProcessingTime(1001L);
        testHarness.processElement(StreamRecordUtils.insertRecord("a", 2L, 11));
        testHarness.processElement(StreamRecordUtils.deleteRecord("a", 1L, 10));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 1L, 10, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("a", 2L, 11, 1L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }
}

