/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank;

import java.util.ArrayList;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
import org.apache.flink.table.runtime.operators.rank.AppendOnlyFirstNFunction;
import org.apache.flink.table.runtime.operators.rank.ConstantRankRange;
import org.apache.flink.table.runtime.operators.rank.RankRange;
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.operators.rank.TopNFunctionTestBase;
import org.apache.flink.table.runtime.operators.rank.VariableRankRange;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.junit.Test;

public class AppendOnlyFirstNFunctionTest
extends TopNFunctionTestBase {
    @Override
    AbstractTopNFunction createFunction(RankType rankType, RankRange rankRange, boolean generateUpdateBefore, boolean outputRankNumber) {
        return new AppendOnlyFirstNFunction(this.ttlConfig, this.inputRowType, generatedSortKeyComparator, this.sortKeySelector, rankType, rankRange, generateUpdateBefore, outputRankNumber);
    }

    @Override
    @Test
    public void testDisableGenerateUpdateBefore() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), false, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        this.assertorWithoutRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Override
    @Test
    public void testDisableGenerateUpdateBeforeAndOutputRankNumber() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), false, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 44, 2L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Override
    @Test
    public void testOutputRankNumberWithConstantRankRange() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), true, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 33, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 44, 2L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Override
    @Test
    public void testConstantRankRangeWithOffset() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(2L, 2L), true, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Override
    @Test
    public void testConstantRankRangeWithoutOffset() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new ConstantRankRange(1L, 2L), true, false);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 12));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        this.assertorWithoutRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Override
    @Test
    public void testOutputRankNumberWithVariableRankRange() throws Exception {
        AbstractTopNFunction func = this.createFunction(RankType.ROW_NUMBER, (RankRange)new VariableRankRange(1), false, true);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness(func);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 12));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 19));
        testHarness.processElement(StreamRecordUtils.insertRecord("book", 2L, 11));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 33));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 44));
        testHarness.processElement(StreamRecordUtils.insertRecord("fruit", 1L, 22));
        testHarness.close();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 12, 1L));
        expectedOutput.add(StreamRecordUtils.insertRecord("book", 2L, 19, 2L));
        expectedOutput.add(StreamRecordUtils.insertRecord("fruit", 1L, 33, 1L));
        this.assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }
}

