/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Tuple1;

@RunWith(value=Parameterized.class)
public class GroupCombineITCase
extends MultipleProgramsTestBaseJUnit4 {
    private static String identityResult = "1,1,Hi\n2,2,Hello\n3,2,Hello world\n4,3,Hello world, how are you?\n5,3,I am fine.\n6,3,Luke Skywalker\n7,4,Comment#1\n8,4,Comment#2\n9,4,Comment#3\n10,4,Comment#4\n11,5,Comment#5\n12,5,Comment#6\n13,5,Comment#7\n14,5,Comment#8\n15,5,Comment#9\n16,6,Comment#10\n17,6,Comment#11\n18,6,Comment#12\n19,6,Comment#13\n20,6,Comment#14\n21,6,Comment#15\n";

    public GroupCombineITCase(MultipleProgramsTestBaseJUnit4.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testAllGroupCombineIdentity() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.combineGroup((GroupCombineFunction)new IdentityFunction()).reduceGroup((GroupReduceFunction)new IdentityFunction());
        List result = reduceDs.collect();
        TestBaseUtils.compareResultAsTuples((List)result, (String)identityResult);
    }

    @Test
    public void testIdentity() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.combineGroup((GroupCombineFunction)new IdentityFunction()).reduceGroup((GroupReduceFunction)new IdentityFunction());
        List result = reduceDs.collect();
        TestBaseUtils.compareResultAsTuples((List)result, (String)identityResult);
    }

    @Test
    public void testIdentityWithGroupBy() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).combineGroup((GroupCombineFunction)new IdentityFunction()).reduceGroup((GroupReduceFunction)new IdentityFunction());
        List result = reduceDs.collect();
        TestBaseUtils.compareResultAsTuples((List)result, (String)identityResult);
    }

    @Test
    public void testIdentityWithGroupByAndSort() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).sortGroup(1, Order.DESCENDING).combineGroup((GroupCombineFunction)new IdentityFunction()).groupBy(new int[]{1}).sortGroup(1, Order.DESCENDING).reduceGroup((GroupReduceFunction)new IdentityFunction());
        List result = reduceDs.collect();
        TestBaseUtils.compareResultAsTuples((List)result, (String)identityResult);
    }

    @Test
    public void testPartialReduceWithIdenticalInputOutputType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapOperator dsWrapped = ds.map((MapFunction)new Tuple3KvWrapper());
        List result = dsWrapped.groupBy(new int[]{0}).combineGroup((GroupCombineFunction)new Tuple3toTuple3GroupReduce()).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new Tuple3toTuple3GroupReduce()).map((MapFunction)new MapFunction<Tuple2<Long, Tuple3<Integer, Long, String>>, Tuple3<Integer, Long, String>>(){

            public Tuple3<Integer, Long, String> map(Tuple2<Long, Tuple3<Integer, Long, String>> value) throws Exception {
                return (Tuple3)value.f1;
            }
        }).collect();
        String expected = "1,1,combined\n5,4,combined\n15,9,combined\n34,16,combined\n65,25,combined\n111,36,combined\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testPartialReduceWithDifferentInputOutputType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapOperator dsWrapped = ds.map((MapFunction)new Tuple3KvWrapper());
        List result = dsWrapped.groupBy(new int[]{0}).combineGroup((GroupCombineFunction)new Tuple3toTuple2GroupReduce()).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new Tuple2toTuple2GroupReduce()).map((MapFunction)new MapFunction<Tuple2<Long, Tuple2<Integer, Long>>, Tuple2<Integer, Long>>(){

            public Tuple2<Integer, Long> map(Tuple2<Long, Tuple2<Integer, Long>> value) throws Exception {
                return (Tuple2)value.f1;
            }
        }).collect();
        String expected = "1,3\n5,20\n15,58\n34,52\n65,70\n111,96\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCheckPartitionShuffleGroupBy() throws Exception {
        Assume.assumeTrue((this.mode != MultipleProgramsTestBaseJUnit4.TestExecutionMode.COLLECTION ? 1 : 0) != 0);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        UnsortedGrouping partitionedDS = ds.partitionByHash(new int[]{0}).groupBy(new int[]{1});
        List result = partitionedDS.combineGroup((GroupCombineFunction)new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>(){

            public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
                int count = 0;
                long key = 0L;
                for (Tuple3<Integer, Long, String> value : values) {
                    key = (Long)value.f1;
                    ++count;
                }
                out.collect((Object)new Tuple2((Object)key, (Object)count));
            }
        }).collect();
        Object[] localExpected = new String[]{"(6,6)", "(5,5)(4,4)", "(3,3)", "(2,2)", "(1,1)"};
        Object[] resultAsStringArray = new String[result.size()];
        for (int i = 0; i < resultAsStringArray.length; ++i) {
            resultAsStringArray[i] = ((Tuple2)result.get(i)).toString();
        }
        Arrays.sort(resultAsStringArray);
        Assert.assertEquals((String)"The two arrays were identical.", (Object)false, (Object)Arrays.equals(localExpected, resultAsStringArray));
    }

    @Test
    public void testCheckPartitionShuffleDOP1() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        UnsortedGrouping partitionedDS = ds.partitionByHash(new int[]{0}).groupBy(new int[]{1});
        List result = partitionedDS.combineGroup((GroupCombineFunction)new GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Integer>>(){

            public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Long, Integer>> out) throws Exception {
                int count = 0;
                long key = 0L;
                for (Tuple3<Integer, Long, String> value : values) {
                    key = (Long)value.f1;
                    ++count;
                }
                out.collect((Object)new Tuple2((Object)key, (Object)count));
            }
        }).collect();
        String expected = "6,6\n5,5\n4,4\n3,3\n2,2\n1,1\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testAPI() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator ds = CollectionDataSets.getStringDataSet(env).map((MapFunction)new MapFunction<String, org.apache.flink.api.java.tuple.Tuple1<String>>(){

            public org.apache.flink.api.java.tuple.Tuple1<String> map(String value) throws Exception {
                return new org.apache.flink.api.java.tuple.Tuple1((Object)value);
            }
        });
        ds.combineGroup((GroupCombineFunction)new GroupCombineFunctionExample()).output((OutputFormat)new DiscardingOutputFormat());
        ds.groupBy(new int[]{0}).combineGroup((GroupCombineFunction)new GroupCombineFunctionExample()).output((OutputFormat)new DiscardingOutputFormat());
        ds.groupBy(new int[]{0}).sortGroup(0, Order.ASCENDING).combineGroup((GroupCombineFunction)new GroupCombineFunctionExample()).output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
    }

    private static interface KvGroupReduce<K, V, INT, OUT>
    extends CombineAndReduceGroup<Tuple2<K, V>, Tuple2<K, INT>, Tuple2<K, OUT>> {
    }

    private static interface CombineAndReduceGroup<IN, INT, OUT>
    extends GroupCombineFunction<IN, INT>,
    GroupReduceFunction<INT, OUT> {
    }

    private class Tuple3KvWrapper
    implements MapFunction<Tuple3<Integer, Long, String>, Tuple2<Long, Tuple3<Integer, Long, String>>> {
        private Tuple3KvWrapper() {
        }

        public Tuple2<Long, Tuple3<Integer, Long, String>> map(Tuple3<Integer, Long, String> value) throws Exception {
            return new Tuple2(value.f1, value);
        }
    }

    private static class Tuple2toTuple2GroupReduce
    implements KvGroupReduce<Long, Tuple2<Integer, Long>, Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
        private Tuple2toTuple2GroupReduce() {
        }

        public void combine(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception {
            int i = 0;
            long l = 0L;
            long key = 0L;
            for (Tuple2<Long, Tuple2<Integer, Long>> value : values) {
                key = (Long)value.f0;
                Tuple2 extracted = (Tuple2)value.f1;
                i += ((Integer)extracted.f0).intValue();
                l += ((Long)extracted.f1).longValue();
            }
            Tuple2 result = new Tuple2((Object)i, (Object)l);
            out.collect((Object)new Tuple2((Object)key, (Object)result));
        }

        public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception {
            this.combine(values, out);
        }
    }

    private static class Tuple3toTuple2GroupReduce
    implements KvGroupReduce<Long, Tuple3<Integer, Long, String>, Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
        private Tuple3toTuple2GroupReduce() {
        }

        public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception {
            int i = 0;
            long l = 0L;
            long key = 0L;
            for (Tuple2<Long, Tuple3<Integer, Long, String>> value : values) {
                key = (Long)value.f0;
                Tuple3 extracted = (Tuple3)value.f1;
                i += ((Integer)extracted.f0).intValue();
                l += (Long)extracted.f1 + (long)((String)extracted.f2).length();
            }
            Tuple2 result = new Tuple2((Object)i, (Object)l);
            out.collect((Object)new Tuple2((Object)key, (Object)result));
        }

        public void reduce(Iterable<Tuple2<Long, Tuple2<Integer, Long>>> values, Collector<Tuple2<Long, Tuple2<Integer, Long>>> out) throws Exception {
            new Tuple2toTuple2GroupReduce().reduce(values, out);
        }
    }

    private static class Tuple3toTuple3GroupReduce
    implements KvGroupReduce<Long, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private Tuple3toTuple3GroupReduce() {
        }

        public void combine(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long, Tuple3<Integer, Long, String>>> out) throws Exception {
            int i = 0;
            long l = 0L;
            long key = 0L;
            for (Tuple2<Long, Tuple3<Integer, Long, String>> value : values) {
                key = (Long)value.f0;
                Tuple3 extracted = (Tuple3)value.f1;
                i += ((Integer)extracted.f0).intValue();
                l += ((Long)extracted.f1).longValue();
            }
            Tuple3 result = new Tuple3((Object)i, (Object)l, (Object)"combined");
            out.collect((Object)new Tuple2((Object)key, (Object)result));
        }

        public void reduce(Iterable<Tuple2<Long, Tuple3<Integer, Long, String>>> values, Collector<Tuple2<Long, Tuple3<Integer, Long, String>>> out) throws Exception {
            this.combine(values, out);
        }
    }

    private static class IdentityFunction
    implements GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>,
    GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private IdentityFunction() {
        }

        public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            for (Tuple3<Integer, Long, String> value : values) {
                out.collect((Object)new Tuple3(value.f0, value.f1, value.f2));
            }
        }

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            for (Tuple3<Integer, Long, String> value : values) {
                out.collect((Object)new Tuple3(value.f0, value.f1, value.f2));
            }
        }
    }

    public static class ScalaGroupCombineFunctionExample
    implements GroupCombineFunction<Tuple1<String>, Tuple1<String>> {
        public void combine(Iterable<Tuple1<String>> values, Collector<Tuple1<String>> out) throws Exception {
            for (Tuple1<String> value : values) {
                out.collect(value);
            }
        }
    }

    private static class GroupCombineFunctionExample
    implements GroupCombineFunction<org.apache.flink.api.java.tuple.Tuple1<String>, org.apache.flink.api.java.tuple.Tuple1<String>> {
        private GroupCombineFunctionExample() {
        }

        public void combine(Iterable<org.apache.flink.api.java.tuple.Tuple1<String>> values, Collector<org.apache.flink.api.java.tuple.Tuple1<String>> out) throws Exception {
            for (org.apache.flink.api.java.tuple.Tuple1<String> value : values) {
                out.collect(value);
            }
        }
    }
}

