/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SortPartitionITCase
extends MultipleProgramsTestBaseJUnit4 {
    public SortPartitionITCase(MultipleProgramsTestBaseJUnit4.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testSortPartitionByKeyField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        List result = ((MapOperator)ds.map(new IdMapper()).setParallelism(4)).sortPartition(1, Order.DESCENDING).mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testSortPartitionByTwoKeyFields() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        List result = ((MapOperator)ds.map(new IdMapper()).setParallelism(2)).sortPartition(4, Order.ASCENDING).sortPartition(2, Order.DESCENDING).mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testSortPartitionByFieldExpression() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        List result = ds.map(new IdMapper()).setParallelism(4).sortPartition("f1", Order.DESCENDING).mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testSortPartitionByTwoFieldExpressions() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        List result = ((MapOperator)ds.map(new IdMapper()).setParallelism(2)).sortPartition("f4", Order.ASCENDING).sortPartition("f2", Order.DESCENDING).mapPartition(new OrderCheckMapper<Tuple5<Integer, Long, Integer, String, Long>>(new Tuple5Checker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testSortPartitionByNestedFieldExpression() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
        List result = ((MapOperator)ds.map(new IdMapper()).setParallelism(3)).sortPartition("f0.f1", Order.ASCENDING).sortPartition("f1", Order.DESCENDING).mapPartition(new OrderCheckMapper<Tuple2<Tuple2<Integer, Integer>, String>>(new NestedTupleChecker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testSortPartitionPojoByNestedFieldExpression() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
        List result = ((MapOperator)ds.map(new IdMapper()).setParallelism(1)).sortPartition("nestedTupleWithCustom.f1.myString", Order.ASCENDING).sortPartition("number", Order.DESCENDING).mapPartition(new OrderCheckMapper<CollectionDataSets.POJO>(new PojoChecker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testSortPartitionParallelismChange() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        List result = ((SortPartitionOperator)ds.sortPartition(1, Order.DESCENDING).setParallelism(3)).mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testSortPartitionWithKeySelector1() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        List result = ((MapOperator)ds.map(new IdMapper()).setParallelism(4)).sortPartition((KeySelector)new KeySelector<Tuple3<Integer, Long, String>, Long>(){

            public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
                return (Long)value.f1;
            }
        }, Order.ASCENDING).mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3AscendingChecker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testSortPartitionWithKeySelector2() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        List result = ((MapOperator)ds.map(new IdMapper()).setParallelism(4)).sortPartition((KeySelector)new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>(){

            public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> value) throws Exception {
                return new Tuple2(value.f0, value.f1);
            }
        }, Order.DESCENDING).mapPartition(new OrderCheckMapper<Tuple3<Integer, Long, String>>(new Tuple3Checker())).distinct().collect();
        String expected = "(true)\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    private static class IdMapper<T>
    implements MapFunction<T, T> {
        private IdMapper() {
        }

        public T map(T value) throws Exception {
            return value;
        }
    }

    private static class OrderCheckMapper<T>
    implements MapPartitionFunction<T, Tuple1<Boolean>> {
        OrderChecker<T> checker;

        public OrderCheckMapper() {
        }

        public OrderCheckMapper(OrderChecker<T> checker) {
            this.checker = checker;
        }

        public void mapPartition(Iterable<T> values, Collector<Tuple1<Boolean>> out) throws Exception {
            Iterator<T> it = values.iterator();
            if (!it.hasNext()) {
                out.collect((Object)new Tuple1((Object)true));
            } else {
                T last = it.next();
                while (it.hasNext()) {
                    T next = it.next();
                    if (!this.checker.inOrder(last, next)) {
                        out.collect((Object)new Tuple1((Object)false));
                        return;
                    }
                    last = next;
                }
                out.collect((Object)new Tuple1((Object)true));
            }
        }
    }

    private static class PojoChecker
    implements OrderChecker<CollectionDataSets.POJO> {
        private PojoChecker() {
        }

        @Override
        public boolean inOrder(CollectionDataSets.POJO t1, CollectionDataSets.POJO t2) {
            return ((CollectionDataSets.CustomType)t1.nestedTupleWithCustom.f1).myString.compareTo(((CollectionDataSets.CustomType)t2.nestedTupleWithCustom.f1).myString) < 0 || ((CollectionDataSets.CustomType)t1.nestedTupleWithCustom.f1).myString.compareTo(((CollectionDataSets.CustomType)t2.nestedTupleWithCustom.f1).myString) == 0 && t1.number >= t2.number;
        }
    }

    private static class NestedTupleChecker
    implements OrderChecker<Tuple2<Tuple2<Integer, Integer>, String>> {
        private NestedTupleChecker() {
        }

        @Override
        public boolean inOrder(Tuple2<Tuple2<Integer, Integer>, String> t1, Tuple2<Tuple2<Integer, Integer>, String> t2) {
            return (Integer)((Tuple2)t1.f0).f1 < (Integer)((Tuple2)t2.f0).f1 || ((Integer)((Tuple2)t1.f0).f1).equals(((Tuple2)t2.f0).f1) && ((String)t1.f1).compareTo((String)t2.f1) >= 0;
        }
    }

    private static class Tuple5Checker
    implements OrderChecker<Tuple5<Integer, Long, Integer, String, Long>> {
        private Tuple5Checker() {
        }

        @Override
        public boolean inOrder(Tuple5<Integer, Long, Integer, String, Long> t1, Tuple5<Integer, Long, Integer, String, Long> t2) {
            return (Long)t1.f4 < (Long)t2.f4 || ((Long)t1.f4).equals(t2.f4) && (Integer)t1.f2 >= (Integer)t2.f2;
        }
    }

    private static class Tuple3AscendingChecker
    implements OrderChecker<Tuple3<Integer, Long, String>> {
        private Tuple3AscendingChecker() {
        }

        @Override
        public boolean inOrder(Tuple3<Integer, Long, String> t1, Tuple3<Integer, Long, String> t2) {
            return (Long)t1.f1 <= (Long)t2.f1;
        }
    }

    private static class Tuple3Checker
    implements OrderChecker<Tuple3<Integer, Long, String>> {
        private Tuple3Checker() {
        }

        @Override
        public boolean inOrder(Tuple3<Integer, Long, String> t1, Tuple3<Integer, Long, String> t2) {
            return (Long)t1.f1 >= (Long)t2.f1;
        }
    }

    private static interface OrderChecker<T>
    extends Serializable {
        public boolean inOrder(T var1, T var2);
    }
}

