/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
import org.apache.flink.table.runtime.operators.sort.BaseTemporalSortOperator;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.Test;

public class RowTimeSortOperatorTest {
    @Test
    public void testSortOnTwoFields() throws Exception {
        InternalTypeInfo inputRowType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new IntType(), new BigIntType(), new VarCharType(Integer.MAX_VALUE), new IntType()});
        int rowTimeIdx = 1;
        GeneratedRecordComparator gComparator = new GeneratedRecordComparator("", "", new Object[0]){
            private static final long serialVersionUID = -6067266199060901331L;

            public RecordComparator newInstance(ClassLoader classLoader) {
                return IntRecordComparator.INSTANCE;
            }
        };
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(inputRowType.toRowFieldTypes());
        RowTimeSortOperator operator = this.createSortOperator((InternalTypeInfo<RowData>)inputRowType, rowTimeIdx, gComparator);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((BaseTemporalSortOperator)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(3, 3L, "Hello world", 3));
        testHarness.processElement(StreamRecordUtils.insertRecord(2, 2L, "Hello", 2));
        testHarness.processElement(StreamRecordUtils.insertRecord(6, 2L, "Luke Skywalker", 6));
        testHarness.processElement(StreamRecordUtils.insertRecord(5, 3L, "I am fine.", 5));
        testHarness.processElement(StreamRecordUtils.insertRecord(7, 1L, "Comment#1", 7));
        testHarness.processElement(StreamRecordUtils.insertRecord(9, 4L, "Comment#3", 9));
        testHarness.processElement(StreamRecordUtils.insertRecord(10, 4L, "Comment#4", 10));
        testHarness.processElement(StreamRecordUtils.insertRecord(8, 4L, "Comment#2", 8));
        testHarness.processElement(StreamRecordUtils.insertRecord(1, 1L, "Hi", 2));
        testHarness.processElement(StreamRecordUtils.insertRecord(1, 1L, "Hi", 1));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, 3L, "Helloworld, how are you?", 4));
        testHarness.processElement(StreamRecordUtils.insertRecord(4, 5L, "Hello, how are you?", 4));
        testHarness.processWatermark(new Watermark(4L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, 1L, "Hi", 2));
        expectedOutput.add(StreamRecordUtils.insertRecord(1, 1L, "Hi", 1));
        expectedOutput.add(StreamRecordUtils.insertRecord(7, 1L, "Comment#1", 7));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, 2L, "Hello", 2));
        expectedOutput.add(StreamRecordUtils.insertRecord(6, 2L, "Luke Skywalker", 6));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, 3L, "Hello world", 3));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, 3L, "Helloworld, how are you?", 4));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, 3L, "I am fine.", 5));
        expectedOutput.add(StreamRecordUtils.insertRecord(8, 4L, "Comment#2", 8));
        expectedOutput.add(StreamRecordUtils.insertRecord(9, 4L, "Comment#3", 9));
        expectedOutput.add(StreamRecordUtils.insertRecord(10, 4L, "Comment#4", 10));
        expectedOutput.add(new Watermark(4L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        expectedOutput.clear();
        operator = this.createSortOperator((InternalTypeInfo<RowData>)inputRowType, rowTimeIdx, gComparator);
        testHarness = this.createTestHarness((BaseTemporalSortOperator)operator);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(5, 3L, "I am fine.", 6));
        testHarness.processWatermark(new Watermark(5L));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, 5L, "Hello, how are you?", 4));
        expectedOutput.add(new Watermark(5L));
        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(11L));
        testHarness.processWatermark(new Watermark(12L));
        expectedOutput.add(new Watermark(11L));
        expectedOutput.add(new Watermark(12L));
        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testOnlySortOnRowTime() throws Exception {
        InternalTypeInfo inputRowType = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new BigIntType(), new BigIntType(), new VarCharType(Integer.MAX_VALUE), new IntType()});
        int rowTimeIdx = 0;
        RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(inputRowType.toRowFieldTypes());
        RowTimeSortOperator operator = this.createSortOperator((InternalTypeInfo<RowData>)inputRowType, rowTimeIdx, null);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((BaseTemporalSortOperator)operator);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(3L, 2L, "Hello world", 3));
        testHarness.processElement(StreamRecordUtils.insertRecord(2L, 2L, "Hello", 2));
        testHarness.processElement(StreamRecordUtils.insertRecord(6L, 3L, "Luke Skywalker", 6));
        testHarness.processElement(StreamRecordUtils.insertRecord(5L, 3L, "I am fine.", 5));
        testHarness.processElement(StreamRecordUtils.insertRecord(7L, 4L, "Comment#1", 7));
        testHarness.processElement(StreamRecordUtils.insertRecord(9L, 4L, "Comment#3", 9));
        testHarness.processElement(StreamRecordUtils.insertRecord(10L, 4L, "Comment#4", 10));
        testHarness.processElement(StreamRecordUtils.insertRecord(8L, 4L, "Comment#2", 8));
        testHarness.processElement(StreamRecordUtils.insertRecord(1L, 1L, "Hi", 2));
        testHarness.processElement(StreamRecordUtils.insertRecord(1L, 1L, "Hi", 1));
        testHarness.processElement(StreamRecordUtils.insertRecord(4L, 3L, "Helloworld, how are you?", 4));
        testHarness.processWatermark(new Watermark(9L));
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, 1L, "Hi", 2));
        expectedOutput.add(StreamRecordUtils.insertRecord(1L, 1L, "Hi", 1));
        expectedOutput.add(StreamRecordUtils.insertRecord(2L, 2L, "Hello", 2));
        expectedOutput.add(StreamRecordUtils.insertRecord(3L, 2L, "Hello world", 3));
        expectedOutput.add(StreamRecordUtils.insertRecord(4L, 3L, "Helloworld, how are you?", 4));
        expectedOutput.add(StreamRecordUtils.insertRecord(5L, 3L, "I am fine.", 5));
        expectedOutput.add(StreamRecordUtils.insertRecord(6L, 3L, "Luke Skywalker", 6));
        expectedOutput.add(StreamRecordUtils.insertRecord(7L, 4L, "Comment#1", 7));
        expectedOutput.add(StreamRecordUtils.insertRecord(8L, 4L, "Comment#2", 8));
        expectedOutput.add(StreamRecordUtils.insertRecord(9L, 4L, "Comment#3", 9));
        expectedOutput.add(new Watermark(9L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        expectedOutput.clear();
        operator = this.createSortOperator((InternalTypeInfo<RowData>)inputRowType, rowTimeIdx, null);
        testHarness = this.createTestHarness((BaseTemporalSortOperator)operator);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord(5L, 3L, "I am fine.", 6));
        testHarness.processWatermark(new Watermark(10L));
        expectedOutput.add(StreamRecordUtils.insertRecord(10L, 4L, "Comment#4", 10));
        expectedOutput.add(new Watermark(10L));
        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(11L));
        testHarness.processWatermark(new Watermark(12L));
        expectedOutput.add(new Watermark(11L));
        expectedOutput.add(new Watermark(12L));
        assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput());
    }

    private RowTimeSortOperator createSortOperator(InternalTypeInfo<RowData> inputRowType, int rowTimeIdx, GeneratedRecordComparator gComparator) {
        return new RowTimeSortOperator(inputRowType, rowTimeIdx, gComparator);
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(BaseTemporalSortOperator operator) throws Exception {
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, (KeySelector)EmptyRowDataKeySelector.INSTANCE, (TypeInformation)EmptyRowDataKeySelector.INSTANCE.getProducedType());
        return testHarness;
    }
}

