/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.harness;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RowComparator;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.table.runtime.aggregate.CollectionRowComparator;
import org.apache.flink.table.runtime.aggregate.ProcTimeSortProcessFunction;
import org.apache.flink.table.runtime.aggregate.RowTimeSortProcessFunction;
import org.apache.flink.table.runtime.types.CRow;
import org.apache.flink.table.runtime.types.CRowTypeInfo;
import org.apache.flink.table.runtime.types.CRowTypeInfo$;
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo$;
import org.apache.flink.types.Row;
import org.junit.Test;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u00054A!\u0001\u0002\u0001\u001f\tq2k\u001c:u!J|7-Z:t\rVt7\r^5p]\"\u000b'O\\3tgR+7\u000f\u001e\u0006\u0003\u0007\u0011\tq\u0001[1s]\u0016\u001c8O\u0003\u0002\u0006\r\u00059!/\u001e8uS6,'BA\u0004\t\u0003\u0015!\u0018M\u00197f\u0015\tI!\"A\u0003gY&t7N\u0003\u0002\f\u0019\u00051\u0011\r]1dQ\u0016T\u0011!D\u0001\u0004_J<7\u0001A\n\u0003\u0001A\u0001\"!\u0005\u000b\u000e\u0003IQ\u0011aE\u0001\u0006g\u000e\fG.Y\u0005\u0003+I\u0011a!\u00118z%\u00164\u0007\"B\f\u0001\t\u0003A\u0012A\u0002\u001fj]&$h\bF\u0001\u001a!\tQ\u0002!D\u0001\u0003\u0011\u0015a\u0002\u0001\"\u0001\u001e\u0003\t\"Xm\u001d;T_J$\bK]8d)&lW\rS1s]\u0016\u001c8\u000fU1si&$\u0018n\u001c8fIR\ta\u0004\u0005\u0002\u0012?%\u0011\u0001E\u0005\u0002\u0005+:LG\u000f\u000b\u0002\u001cEA\u00111EJ\u0007\u0002I)\u0011Q\u0005D\u0001\u0006UVt\u0017\u000e^\u0005\u0003O\u0011\u0012A\u0001V3ti\")\u0011\u0006\u0001C\u0001;\u0005\tC/Z:u'>\u0014HOU8x)&lW\rS1s]\u0016\u001c8\u000fU1si&$\u0018n\u001c8fI\"\u0012\u0001FI\u0004\u0006Y\tA\t!L\u0001\u001f'>\u0014H\u000f\u0015:pG\u0016\u001c8OR;oGRLwN\u001c%be:,7o\u001d+fgR\u0004\"A\u0007\u0018\u0007\u000b\u0005\u0011\u0001\u0012A\u0018\u0014\u00059\u0002\u0002\"B\f/\t\u0003\tD#A\u0017\u0007\tMr\u0003\u0001\u000e\u0002\u0011)V\u0004H.\u001a*poN+G.Z2u_J\u001c2AM\u001b>!\t14(D\u00018\u0015\tA\u0014(\u0001\u0003mC:<'\"\u0001\u001e\u0002\t)\fg/Y\u0005\u0003y]\u0012aa\u00142kK\u000e$\b\u0003\u0002 E\r2k\u0011a\u0010\u0006\u0003\u0001\u0006\u000b\u0011BZ;oGRLwN\\:\u000b\u0005i\u0012%BA\"\t\u0003\r\t\u0007/[\u0005\u0003\u000b~\u00121bS3z'\u0016dWm\u0019;peB\u0011qIS\u0007\u0002\u0011*\u0011\u0011\nB\u0001\u0006if\u0004Xm]\u0005\u0003\u0017\"\u0013Aa\u0011*poB\u0011a'T\u0005\u0003\u001d^\u0012q!\u00138uK\u001e,'\u000f\u0003\u0005Qe\t\u0015\r\u0011\"\u0003R\u00035\u0019X\r\\3di>\u0014h)[3mIV\t!\u000b\u0005\u0002\u0012'&\u0011AK\u0005\u0002\u0004\u0013:$\b\u0002\u0003,3\u0005\u0003\u0005\u000b\u0011\u0002*\u0002\u001dM,G.Z2u_J4\u0015.\u001a7eA!)qC\rC\u00011R\u0011\u0011l\u0017\t\u00035Jj\u0011A\f\u0005\u0006!^\u0003\rA\u0015\u0005\u0006;J\"\tEX\u0001\u0007O\u0016$8*Z=\u0015\u00051{\u0006\"\u00021]\u0001\u00041\u0015!\u0002<bYV,\u0007")
public class SortProcessFunctionHarnessTest {
    @Test
    public void testSortProcTimeHarnessPartitioned() {
        RowTypeInfo rT = new RowTypeInfo((TypeInformation[])((Object[])new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO}), (String[])((Object[])new String[]{"a", "b", "c", "d", "e"}));
        int[] indexes = new int[]{1, 2};
        TypeComparator[] fieldComps = (TypeComparator[])((Object[])new TypeComparator[]{BasicTypeInfo.LONG_TYPE_INFO.createComparator(true, null), BasicTypeInfo.INT_TYPE_INFO.createComparator(false, null)});
        boolean[] booleanOrders = new boolean[]{true, false};
        RowComparator rowComp = new RowComparator(rT.getTotalFields(), indexes, fieldComps, new TypeSerializer[0], booleanOrders);
        CollectionRowComparator collectionRowComparator = new CollectionRowComparator((TypeComparator)rowComp);
        CRowTypeInfo inputCRowType = CRowTypeInfo$.MODULE$.apply((TypeInformation)rT);
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new ProcTimeSortProcessFunction(inputCRowType, collectionRowComparator));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)processFunction, (KeySelector)new TupleRowSelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(11L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        testHarness.setProcessingTime(1005L);
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        testHarness.setProcessingTime(1008L);
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(11L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(1L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(2L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(3L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(11L)}), true)));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correctly sorted.", expectedOutput, (Queue)result);
        testHarness.close();
    }

    @Test
    public void testSortRowTimeHarnessPartitioned() {
        RowTypeInfo rT = new RowTypeInfo((TypeInformation[])((Object[])new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, TimeIndicatorTypeInfo$.MODULE$.ROWTIME_INDICATOR()}), (String[])((Object[])new String[]{"a", "b", "c", "d", "e"}));
        int[] indexes = new int[]{1, 2};
        TypeComparator[] fieldComps = (TypeComparator[])((Object[])new TypeComparator[]{BasicTypeInfo.LONG_TYPE_INFO.createComparator(true, null), BasicTypeInfo.INT_TYPE_INFO.createComparator(false, null)});
        boolean[] booleanOrders = new boolean[]{true, false};
        RowComparator rowComp = new RowComparator(rT.getTotalFields(), indexes, fieldComps, new TypeSerializer[0], booleanOrders);
        CollectionRowComparator collectionRowComparator = new CollectionRowComparator((TypeComparator)rowComp);
        CRowTypeInfo inputCRowType = CRowTypeInfo$.MODULE$.apply((TypeInformation)rT);
        LegacyKeyedProcessOperator processFunction = new LegacyKeyedProcessOperator((ProcessFunction)new RowTimeSortProcessFunction(inputCRowType, 4, (Option)new Some((Object)collectionRowComparator)));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)processFunction, (KeySelector)new TupleRowSelector(0), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        testHarness.open();
        testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
        testHarness.processWatermark(3L);
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(11L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(1001L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(13L), Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(3), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(14L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(3), "aaa", Predef$.MODULE$.long2Long(2004L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2006L)}), true)));
        testHarness.processWatermark(2007L);
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(2008L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(14L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(3), "aaa", Predef$.MODULE$.long2Long(2019L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.long2Long(2008L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2010L)}), true)));
        testHarness.processElement(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(19L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2008L)}), true)));
        testHarness.processWatermark(2012L);
        ConcurrentLinkedQueue result = testHarness.getOutput();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(3L));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(11L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(1001L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(3), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(13L), Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(14L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2002L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(12L), Predef$.MODULE$.int2Integer(3), "aaa", Predef$.MODULE$.long2Long(2004L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2006L)}), true)));
        expectedOutput.add(new Watermark(2007L));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(19L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2008L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.int2Integer(2), "aaa", Predef$.MODULE$.long2Long(2008L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(20L), Predef$.MODULE$.int2Integer(1), "aaa", Predef$.MODULE$.long2Long(2008L)}), true)));
        expectedOutput.add(new StreamRecord((Object)new CRow(Row.of((Object[])new Object[]{Predef$.MODULE$.int2Integer(1), Predef$.MODULE$.long2Long(10L), Predef$.MODULE$.int2Integer(0), "aaa", Predef$.MODULE$.long2Long(2010L)}), true)));
        expectedOutput.add(new Watermark(2012L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correctly sorted.", expectedOutput, (Queue)result);
        testHarness.close();
    }

    public static class TupleRowSelector
    implements KeySelector<CRow, Integer> {
        private final int selectorField;

        private int selectorField() {
            return this.selectorField;
        }

        public Integer getKey(CRow value) {
            return (Integer)value.row().getField(this.selectorField());
        }

        public TupleRowSelector(int selectorField) {
            this.selectorField = selectorField;
        }
    }
}

