/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.javaApiOperators;

import java.sql.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.ProjectOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.math.BigInt;

@RunWith(value=Parameterized.class)
public class GroupReduceITCase
extends MultipleProgramsTestBase {
    public GroupReduceITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testCorrectnessofGroupReduceOnTupleContainingPrimitiveByteArrayWithKeyFieldSelectors() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple2<byte[], Integer>> ds = CollectionDataSets.getTuple2WithByteArrayDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new ByteArrayGroupReduce());
        List result = reduceDs.collect();
        String expected = "0\n1\n2\n3\n4\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new Tuple3GroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1\n5,2\n15,3\n34,4\n65,5\n111,6\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelectors() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{4, 0}).reduceGroup((GroupReduceFunction)new Tuple5GroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1,0,P-),1\n2,3,0,P-),1\n2,2,0,P-),2\n3,9,0,P-),2\n3,6,0,P-),3\n4,17,0,P-),1\n4,17,0,P-),2\n5,11,0,P-),1\n5,29,0,P-),2\n5,25,0,P-),3\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).sortGroup(2, Order.ASCENDING).reduceGroup((GroupReduceFunction)new Tuple3SortedGroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1,Hi\n5,2,Hello-Hello world\n15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n34,4,Comment#1-Comment#2-Comment#3-Comment#4\n65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector1()).reduceGroup((GroupReduceFunction)new Tuple3GroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1\n5,2\n15,3\n34,4\n65,5\n111,6\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceOnCustomTypeWithTypeExtractor() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector2()).reduceGroup((GroupReduceFunction)new CustomTypeGroupReduce());
        List result = reduceDs.collect();
        String expected = "1,0,Hello!\n2,3,Hello!\n3,12,Hello!\n4,30,Hello!\n5,60,Hello!\n6,105,Hello!\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfAllGroupReduceForTuples() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.reduceGroup((GroupReduceFunction)new AllAddingTuple3GroupReduce());
        List result = reduceDs.collect();
        String expected = "231,91,Hello World\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfAllGroupReduceForCustomTypes() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        GroupReduceOperator reduceDs = ds.reduceGroup((GroupReduceFunction)new AllAddingCustomTypeGroupReduce());
        List result = reduceDs.collect();
        String expected = "91,210,Hello!";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceWithBroadcastSet() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        SingleInputUdfOperator reduceDs = ds.groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
        List result = reduceDs.collect();
        String expected = "1,1,55\n5,2,55\n15,3,55\n34,4,55\n65,5,55\n111,6,55\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceIfUDFReturnsInputObjectsMultipleTimesWhileChangingThem() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new InputReturningTuple3GroupReduce());
        List result = reduceDs.collect();
        String expected = "11,1,Hi!\n21,1,Hi again!\n12,2,Hi!\n22,2,Hi again!\n13,2,Hi!\n23,2,Hi again!\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine() throws Exception {
        Assume.assumeTrue((this.mode != MultipleProgramsTestBase.TestExecutionMode.COLLECTION ? 1 : 0) != 0);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector3()).reduceGroup((GroupReduceFunction)new CustomTypeGroupReduceWithCombine());
        List result = reduceDs.collect();
        String expected = "1,0,test1\n2,3,test2\n3,12,test3\n4,30,test4\n5,60,test5\n6,105,test6\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceOnTuplesWithCombine() throws Exception {
        Assume.assumeTrue((this.mode != MultipleProgramsTestBase.TestExecutionMode.COLLECTION ? 1 : 0) != 0);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new Tuple3GroupReduceWithCombine());
        List result = reduceDs.collect();
        String expected = "1,test1\n5,test2\n15,test3\n34,test4\n65,test5\n111,test6\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfAllGroupReduceForTuplesWithCombine() throws Exception {
        Assume.assumeTrue((this.mode != MultipleProgramsTestBase.TestExecutionMode.COLLECTION ? 1 : 0) != 0);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Operator ds = CollectionDataSets.get3TupleDataSet(env).map(new IdentityMapper()).setParallelism(4);
        Configuration cfg = new Configuration();
        cfg.setString("INPUT_SHIP_STRATEGY", "SHIP_REPARTITION");
        SingleInputUdfOperator reduceDs = ds.reduceGroup((GroupReduceFunction)new Tuple3AllGroupReduceWithCombine()).withParameters(cfg);
        List result = reduceDs.collect();
        String expected = "322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).sortGroup(2, Order.DESCENDING).reduceGroup((GroupReduceFunction)new Tuple3SortedGroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1,Hi\n5,2,Hello world-Hello\n15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n34,4,Comment#4-Comment#3-Comment#2-Comment#1\n65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector4()).reduceGroup((GroupReduceFunction)new Tuple5GroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1,0,P-),1\n2,3,0,P-),1\n2,2,0,P-),2\n3,9,0,P-),2\n3,6,0,P-),3\n4,17,0,P-),1\n4,17,0,P-),2\n5,11,0,P-),1\n5,29,0,P-),2\n5,25,0,P-),3\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).sortGroup(0, Order.ASCENDING).reduceGroup((GroupReduceFunction)new OrderCheckingCombinableReduce());
        List result = reduceDs.collect();
        String expected = "1,1,Hi\n2,2,Hello\n4,3,Hello world, how are you?\n7,4,Comment#1\n11,5,Comment#5\n16,6,Comment#10\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testDeepNesting() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CrazyNested> ds = CollectionDataSets.getCrazyNestedDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal"}).reduceGroup((GroupReduceFunction)new GroupReducer1());
        List result = reduceDs.collect();
        String expected = "aa,1\nbb,2\ncc,3\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testPojoExtendingFromTupleWithCustomFields() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.FromTupleWithCTor> ds = CollectionDataSets.getPojoExtendingFromTuple(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"special", "f2"}).reduceGroup((GroupReduceFunction)new GroupReducer2());
        List result = reduceDs.collect();
        String expected = "3\n2\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testPojoContainigWritableAndTuples() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.PojoContainingTupleAndWritable> ds = CollectionDataSets.getPojoContainingTupleAndWritable(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"hadoopFan", "theTuple.*"}).reduceGroup((GroupReduceFunction)new GroupReducer3());
        List result = reduceDs.collect();
        String expected = "1\n5\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testTupleContainingPojosAndRegularFields() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, CollectionDataSets.CrazyNested, CollectionDataSets.POJO>> ds = CollectionDataSets.getTupleContainingPojos(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"f0", "f1.*"}).reduceGroup((GroupReduceFunction)new GroupReducer4());
        List result = reduceDs.collect();
        String expected = "3\n1\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testStringBasedDefinitionOnGroupSort() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new int[]{1}).sortGroup("f2", Order.DESCENDING).reduceGroup((GroupReduceFunction)new Tuple3SortedGroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1,Hi\n5,2,Hello world-Hello\n15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n34,4,Comment#4-Comment#3-Comment#2-Comment#1\n65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testIntBasedDefinitionOnGroupSortForFullNestedTuple() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"f1"}).sortGroup(0, Order.DESCENDING).reduceGroup((GroupReduceFunction)new NestedTupleReducer());
        List result = reduceDs.collect();
        String expected = "a--(2,1)-(1,3)-(1,2)-\nb--(2,2)-\nc--(4,9)-(3,6)-(3,3)-\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testIntBasedDefinitionOnGroupSortForPartialNestedTuple() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"f1"}).sortGroup("f0.f0", Order.ASCENDING).sortGroup("f0.f1", Order.ASCENDING).reduceGroup((GroupReduceFunction)new NestedTupleReducer());
        List result = reduceDs.collect();
        String expected = "a--(1,2)-(1,3)-(2,1)-\nb--(2,2)-\nc--(3,3)-(3,6)-(4,9)-\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testStringBasedDefinitionOnGroupSortForPartialNestedTuple() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"f1"}).sortGroup("f0.f0", Order.DESCENDING).reduceGroup((GroupReduceFunction)new NestedTupleReducer());
        List result = reduceDs.collect();
        String expected = "a--(2,1)-(1,3)-(1,2)-\nb--(2,2)-\nc--(4,9)-(3,3)-(3,6)-\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeys() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds = CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"f1"}).sortGroup("f0.f0", Order.DESCENDING).sortGroup("f0.f1", Order.DESCENDING).reduceGroup((GroupReduceFunction)new NestedTupleReducer());
        List result = reduceDs.collect();
        String expected = "a--(2,1)-(1,3)-(1,2)-\nb--(2,2)-\nc--(4,9)-(3,6)-(3,3)-\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<CollectionDataSets.PojoContainingTupleAndWritable> ds = CollectionDataSets.getGroupSortedPojoContainingTupleAndWritable(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"hadoopFan"}).sortGroup("theTuple.f0", Order.DESCENDING).sortGroup("theTuple.f1", Order.DESCENDING).reduceGroup((GroupReduceFunction)new GroupReducer5());
        List result = reduceDs.collect();
        String expected = "1---(10,100)-\n2---(30,600)-(30,400)-(30,200)-(20,201)-(20,200)-\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testTupleKeySelectorGroupSort() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new LongFieldExtractor(1)).sortGroup(new StringFieldExtractor(2), Order.DESCENDING).reduceGroup((GroupReduceFunction)new Tuple3SortedGroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1,Hi\n5,2,Hello world-Hello\n15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n34,4,Comment#4-Comment#3-Comment#2-Comment#1\n65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testPojoKeySelectorGroupSort() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy((KeySelector)new TwoTuplePojoExtractor()).sortGroup((KeySelector)new StringPojoExtractor(), Order.DESCENDING).reduceGroup((GroupReduceFunction)new CustomTypeSortedGroupReduce());
        List result = reduceDs.collect();
        String expected = "1,0,Hi\n2,3,Hello world-Hello\n3,12,Luke Skywalker-I am fine.-Hello world, how are you?\n4,30,Comment#4-Comment#3-Comment#2-Comment#1\n5,60,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n6,105,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testTupleKeySelectorSortWithCombine() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new LongFieldExtractor(1)).sortGroup(new StringFieldExtractor(2), Order.DESCENDING).reduceGroup((GroupReduceFunction)new Tuple3SortedGroupReduceWithCombine());
        List result = reduceDs.collect();
        if (this.mode != MultipleProgramsTestBase.TestExecutionMode.COLLECTION) {
            String expected = "1,Hi\n5,Hello world-Hello\n15,Luke Skywalker-I am fine.-Hello world, how are you?\n34,Comment#4-Comment#3-Comment#2-Comment#1\n65,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n111,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
            GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
        }
    }

    @Test
    public void testTupleKeySelectorSortCombineOnTuple() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        GroupReduceOperator reduceDs = ds.groupBy(new IntFieldExtractor(0)).sortGroup((KeySelector)new FiveToTwoTupleExtractor(), Order.DESCENDING).reduceGroup((GroupReduceFunction)new Tuple5SortedGroupReduce());
        List result = reduceDs.collect();
        String expected = "1,1,0,Hallo,1\n2,5,0,Hallo Welt-Hallo Welt wie,1\n3,15,0,BCD-ABC-Hallo Welt wie gehts?,2\n4,34,0,FGH-CDE-EFG-DEF,1\n5,65,0,IJK-HIJ-KLM-JKL-GHI,1\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testGroupingWithPojoContainingMultiplePojos() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<CollectionDataSets.PojoWithMultiplePojos> ds = CollectionDataSets.getPojoWithMultiplePojos(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"p2.a2"}).reduceGroup((GroupReduceFunction)new GroupReducer6());
        List result = reduceDs.collect();
        String expected = "b\nccc\nee\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testJavaCollectionsWithinPojos() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"key"}).reduceGroup((GroupReduceFunction)new GroupReducer7());
        List result = reduceDs.collect();
        String expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testGroupByGenericType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"bigInt"}).reduceGroup((GroupReduceFunction)new GroupReducer8());
        List result = reduceDs.collect();
        ExecutionConfig ec = env.getConfig();
        Assert.assertTrue((boolean)ec.getRegisteredKryoTypes().contains(BigInt.class));
        Assert.assertTrue((boolean)ec.getRegisteredKryoTypes().contains(Date.class));
        Object expected = null;
        String localExpected = "[call\nFor key 92233720368547758070 we got:\nPojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\nFor key 92233720368547758070 we got:\nPojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}]";
        Assert.assertEquals((Object)localExpected, (Object)result.toString());
    }

    @Test
    public void testGroupReduceSelectorKeysWithSemProps() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        GroupReduceOperator reduceDs = ((GroupReduceOperator)ds.groupBy((KeySelector)new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Long>(){

            public Long getKey(Tuple5<Integer, Long, Integer, String, Long> v) throws Exception {
                return (long)((Integer)v.f0).intValue() * (Long)v.f1 - (long)((Integer)v.f2).intValue() * (Long)v.f4;
            }
        }).reduceGroup((GroupReduceFunction)new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>>(){

            public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) throws Exception {
                for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
                    out.collect(v);
                }
            }
        }).withForwardedFields(new String[]{"0"})).groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>(){

            public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
                int k = 0;
                long s = 0L;
                for (Tuple5<Integer, Long, Integer, String, Long> v : values) {
                    k = (Integer)v.f0;
                    s += ((Long)v.f1).longValue();
                }
                out.collect((Object)new Tuple2((Object)k, (Object)s));
            }
        });
        List result = reduceDs.collect();
        String expected = "1,1\n2,5\n3,15\n4,34\n5,65\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testGroupReduceWithAtomicValue() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource ds = env.fromElements((Object[])new Integer[]{1, 1, 2, 3, 4});
        GroupReduceOperator reduceDs = ds.groupBy(new String[]{"*"}).reduceGroup((GroupReduceFunction)new GroupReduceFunction<Integer, Integer>(){

            public void reduce(Iterable<Integer> values, Collector<Integer> out) throws Exception {
                out.collect((Object)values.iterator().next());
            }
        });
        List result = reduceDs.collect();
        String expected = "1\n2\n3\n4";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testJodatimeDateTimeWithKryo() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource ds = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1, (Object)DateTime.now())});
        ProjectOperator reduceDs = ds.groupBy(new String[]{"f1"}).sum(0).project(new int[]{0});
        List result = reduceDs.collect();
        String expected = "1\n";
        GroupReduceITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testDateNullException() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource in = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0, (Object)new java.util.Date(1230000000L)), new Tuple2((Object)1, null), new Tuple2((Object)2, (Object)new java.util.Date(1230000000L))});
        GroupReduceOperator r = in.groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new GroupReduceFunction<Tuple2<Integer, java.util.Date>, String>(){

            public void reduce(Iterable<Tuple2<Integer, java.util.Date>> values, Collector<String> out) throws Exception {
                for (Tuple2<Integer, java.util.Date> e : values) {
                    out.collect((Object)Integer.toString((Integer)e.f0));
                }
            }
        });
        List result = r.collect();
        String expected = "0\n1\n2\n";
        GroupReduceITCase.compareResultAsText((List)result, (String)expected);
    }

    private static int countElements(Iterable<?> iterable) {
        int c = 0;
        for (Object o : iterable) {
            ++c;
        }
        return c;
    }

    public static final class IdentityMapper<T>
    extends RichMapFunction<T, T> {
        public T map(T value) {
            return value;
        }
    }

    @RichGroupReduceFunction.Combinable
    public static class OrderCheckingCombinableReduce
    extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            Iterator<Tuple3<Integer, Long, String>> it = values.iterator();
            Tuple3<Integer, Long, String> t = it.next();
            int i = (Integer)t.f0;
            out.collect(t);
            while (it.hasNext()) {
                t = it.next();
                if (i <= (Integer)t.f0 && !((String)t.f2).equals("INVALID-ORDER!")) continue;
                t.f2 = "INVALID-ORDER!";
                out.collect(t);
            }
        }

        public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            Iterator<Tuple3<Integer, Long, String>> it = values.iterator();
            Tuple3<Integer, Long, String> t = it.next();
            int i = (Integer)t.f0;
            out.collect(t);
            while (it.hasNext()) {
                t = it.next();
                if (i <= (Integer)t.f0) continue;
                t.f2 = "INVALID-ORDER!";
                out.collect(t);
            }
        }
    }

    @RichGroupReduceFunction.Combinable
    public static class CustomTypeGroupReduceWithCombine
    extends RichGroupReduceFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void combine(Iterable<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) throws Exception {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType();
            for (CollectionDataSets.CustomType c : values) {
                o.myInt = c.myInt;
                o.myLong += c.myLong;
                o.myString = "test" + c.myInt;
            }
            out.collect((Object)o);
        }

        public void reduce(Iterable<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType(0, 0L, "");
            for (CollectionDataSets.CustomType c : values) {
                o.myInt = c.myInt;
                o.myLong += c.myLong;
                o.myString = c.myString;
            }
            out.collect((Object)o);
        }
    }

    @RichGroupReduceFunction.Combinable
    public static class Tuple3AllGroupReduceWithCombine
    extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;

        public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            Tuple3 o = new Tuple3((Object)0, (Object)0L, (Object)"");
            for (Tuple3<Integer, Long, String> t : values) {
                Tuple3 tuple3 = o;
                Integer.valueOf((Integer)tuple3.f0 + (Integer)t.f0);
                tuple3.f0 = tuple3.f0;
                tuple3 = o;
                Long.valueOf((Long)tuple3.f1 + (Long)t.f1);
                tuple3.f1 = tuple3.f1;
                o.f2 = o.f2 + "test";
            }
            out.collect((Object)o);
        }

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
            int i = 0;
            String s = "";
            for (Tuple3<Integer, Long, String> t : values) {
                i = (int)((long)i + ((long)((Integer)t.f0).intValue() + (Long)t.f1));
                s = s + (String)t.f2;
            }
            out.collect((Object)new Tuple2((Object)i, (Object)s));
        }
    }

    @RichGroupReduceFunction.Combinable
    public static class Tuple3SortedGroupReduceWithCombine
    extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;

        public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            int sum = 0;
            long key = 0L;
            StringBuilder concat = new StringBuilder();
            for (Tuple3<Integer, Long, String> next : values) {
                sum += ((Integer)next.f0).intValue();
                key = (Long)next.f1;
                concat.append((String)next.f2).append("-");
            }
            if (concat.length() > 0) {
                concat.setLength(concat.length() - 1);
            }
            out.collect((Object)new Tuple3((Object)sum, (Object)key, (Object)concat.toString()));
        }

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
            int i = 0;
            String s = "";
            for (Tuple3<Integer, Long, String> t : values) {
                i += ((Integer)t.f0).intValue();
                s = (String)t.f2;
            }
            out.collect((Object)new Tuple2((Object)i, (Object)s));
        }
    }

    @RichGroupReduceFunction.Combinable
    public static class Tuple3GroupReduceWithCombine
    extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;

        public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            Tuple3 o = new Tuple3((Object)0, (Object)0L, (Object)"");
            for (Tuple3<Integer, Long, String> t : values) {
                Tuple3 tuple3 = o;
                Integer.valueOf((Integer)tuple3.f0 + (Integer)t.f0);
                tuple3.f0 = tuple3.f0;
                o.f1 = t.f1;
                o.f2 = "test" + o.f1;
            }
            out.collect((Object)o);
        }

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
            int i = 0;
            String s = "";
            for (Tuple3<Integer, Long, String> t : values) {
                i += ((Integer)t.f0).intValue();
                s = (String)t.f2;
            }
            out.collect((Object)new Tuple2((Object)i, (Object)s));
        }
    }

    public static class BCTuple3GroupReduce
    extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private String f2Replace = "";

        public void open(Configuration config) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            int sum = 0;
            for (Integer i : ints) {
                sum += i.intValue();
            }
            this.f2Replace = sum + "";
        }

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            int i = 0;
            long l = 0L;
            for (Tuple3<Integer, Long, String> t : values) {
                i += ((Integer)t.f0).intValue();
                l = (Long)t.f1;
            }
            out.collect((Object)new Tuple3((Object)i, (Object)l, (Object)this.f2Replace));
        }
    }

    public static class AllAddingCustomTypeGroupReduce
    implements GroupReduceFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType(0, 0L, "Hello!");
            for (CollectionDataSets.CustomType next : values) {
                o.myInt += next.myInt;
                o.myLong += next.myLong;
            }
            out.collect((Object)o);
        }
    }

    public static class AllAddingTuple3GroupReduce
    implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            int i = 0;
            long l = 0L;
            for (Tuple3<Integer, Long, String> t : values) {
                i += ((Integer)t.f0).intValue();
                l += ((Long)t.f1).longValue();
            }
            out.collect((Object)new Tuple3((Object)i, (Object)l, (Object)"Hello World"));
        }
    }

    public static class InputReturningTuple3GroupReduce
    implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            for (Tuple3<Integer, Long, String> t : values) {
                if ((Integer)t.f0 >= 4) continue;
                t.f2 = "Hi!";
                Tuple3<Integer, Long, String> tuple3 = t;
                Integer.valueOf((Integer)tuple3.f0 + 10);
                tuple3.f0 = tuple3.f0;
                out.collect(t);
                tuple3 = t;
                Integer.valueOf((Integer)tuple3.f0 + 10);
                tuple3.f0 = tuple3.f0;
                t.f2 = "Hi again!";
                out.collect(t);
            }
        }
    }

    public static class CustomTypeSortedGroupReduce
    implements GroupReduceFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) {
            Iterator<CollectionDataSets.CustomType> iter = values.iterator();
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType();
            CollectionDataSets.CustomType c = iter.next();
            StringBuilder concat = new StringBuilder(c.myString);
            o.myInt = c.myInt;
            o.myLong = c.myLong;
            while (iter.hasNext()) {
                CollectionDataSets.CustomType next = iter.next();
                concat.append("-").append(next.myString);
                o.myLong += next.myLong;
            }
            o.myString = concat.toString();
            out.collect((Object)o);
        }
    }

    public static class CustomTypeGroupReduce
    implements GroupReduceFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) {
            Iterator<CollectionDataSets.CustomType> iter = values.iterator();
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType();
            CollectionDataSets.CustomType c = iter.next();
            o.myString = "Hello!";
            o.myInt = c.myInt;
            o.myLong = c.myLong;
            while (iter.hasNext()) {
                CollectionDataSets.CustomType next = iter.next();
                o.myLong += next.myLong;
            }
            out.collect((Object)o);
        }
    }

    public static class Tuple5SortedGroupReduce
    implements GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) {
            int i = 0;
            long l = 0L;
            long l2 = 0L;
            StringBuilder concat = new StringBuilder();
            for (Tuple5<Integer, Long, Integer, String, Long> t : values) {
                i = (Integer)t.f0;
                l += ((Long)t.f1).longValue();
                concat.append((String)t.f3).append("-");
                l2 = (Long)t.f4;
            }
            if (concat.length() > 0) {
                concat.setLength(concat.length() - 1);
            }
            out.collect((Object)new Tuple5((Object)i, (Object)l, (Object)0, (Object)concat.toString(), (Object)l2));
        }
    }

    public static class Tuple5GroupReduce
    implements GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) {
            int i = 0;
            long l = 0L;
            long l2 = 0L;
            for (Tuple5<Integer, Long, Integer, String, Long> t : values) {
                i = (Integer)t.f0;
                l += ((Long)t.f1).longValue();
                l2 = (Long)t.f4;
            }
            out.collect((Object)new Tuple5((Object)i, (Object)l, (Object)0, (Object)"P-)", (Object)l2));
        }
    }

    public static class Tuple3SortedGroupReduce
    implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            int sum = 0;
            long key = 0L;
            StringBuilder concat = new StringBuilder();
            for (Tuple3<Integer, Long, String> next : values) {
                sum += ((Integer)next.f0).intValue();
                key = (Long)next.f1;
                concat.append((String)next.f2).append("-");
            }
            if (concat.length() > 0) {
                concat.setLength(concat.length() - 1);
            }
            out.collect((Object)new Tuple3((Object)sum, (Object)key, (Object)concat.toString()));
        }
    }

    public static class Tuple3GroupReduce
    implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, Long>> out) {
            int i = 0;
            long l = 0L;
            for (Tuple3<Integer, Long, String> t : values) {
                i += ((Integer)t.f0).intValue();
                l = (Long)t.f1;
            }
            out.collect((Object)new Tuple2((Object)i, (Object)l));
        }
    }

    public static class NestedTupleReducer
    implements GroupReduceFunction<Tuple2<Tuple2<Integer, Integer>, String>, String> {
        public void reduce(Iterable<Tuple2<Tuple2<Integer, Integer>, String>> values, Collector<String> out) {
            boolean once = false;
            StringBuilder concat = new StringBuilder();
            for (Tuple2<Tuple2<Integer, Integer>, String> value : values) {
                if (!once) {
                    concat.append((String)value.f1).append("--");
                    once = true;
                }
                concat.append(value.f0);
                concat.append("-");
            }
            out.collect((Object)concat.toString());
        }
    }

    public static class GroupReducer8
    implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
        public void reduce(Iterable<CollectionDataSets.PojoWithCollection> values, Collector<String> out) {
            StringBuilder concat = new StringBuilder();
            concat.append("call");
            for (CollectionDataSets.PojoWithCollection value : values) {
                concat.append("\nFor key ").append(value.bigInt).append(" we got:\n").append(value);
            }
            out.collect((Object)concat.toString());
        }
    }

    public static class GroupReducer7
    implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
        public void reduce(Iterable<CollectionDataSets.PojoWithCollection> values, Collector<String> out) {
            StringBuilder concat = new StringBuilder();
            concat.append("call");
            for (CollectionDataSets.PojoWithCollection value : values) {
                concat.append("For key ").append(value.key).append(" we got: ");
                for (CollectionDataSets.Pojo1 p : value.pojos) {
                    concat.append("pojo.a=").append(p.a);
                }
            }
            out.collect((Object)concat.toString());
        }
    }

    public static class GroupReducer6
    implements GroupReduceFunction<CollectionDataSets.PojoWithMultiplePojos, String> {
        public void reduce(Iterable<CollectionDataSets.PojoWithMultiplePojos> values, Collector<String> out) throws Exception {
            StringBuilder concat = new StringBuilder();
            for (CollectionDataSets.PojoWithMultiplePojos value : values) {
                concat.append(value.p2.a2);
            }
            out.collect((Object)concat.toString());
        }
    }

    public static class GroupReducer5
    implements GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, String> {
        public void reduce(Iterable<CollectionDataSets.PojoContainingTupleAndWritable> values, Collector<String> out) throws Exception {
            boolean once = false;
            StringBuilder concat = new StringBuilder();
            for (CollectionDataSets.PojoContainingTupleAndWritable value : values) {
                if (!once) {
                    concat.append(value.hadoopFan.get());
                    concat.append("---");
                    once = true;
                }
                concat.append(value.theTuple);
                concat.append("-");
            }
            out.collect((Object)concat.toString());
        }
    }

    public static class FiveToTwoTupleExtractor
    implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Long, Integer>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, Integer, String, Long> in) {
            return new Tuple2(in.f4, in.f2);
        }
    }

    public static class StringFieldExtractor<T extends Tuple>
    implements KeySelector<T, String> {
        private static final long serialVersionUID = 1L;
        private int field;

        public StringFieldExtractor() {
        }

        public StringFieldExtractor(int field) {
            this.field = field;
        }

        public String getKey(T t) throws Exception {
            return (String)t.getField(this.field);
        }
    }

    public static class IntFieldExtractor<T extends Tuple>
    implements KeySelector<T, Integer> {
        private static final long serialVersionUID = 1L;
        private int field;

        public IntFieldExtractor() {
        }

        public IntFieldExtractor(int field) {
            this.field = field;
        }

        public Integer getKey(T t) throws Exception {
            return (Integer)t.getField(this.field);
        }
    }

    public static class LongFieldExtractor<T extends Tuple>
    implements KeySelector<T, Long> {
        private static final long serialVersionUID = 1L;
        private int field;

        public LongFieldExtractor() {
        }

        public LongFieldExtractor(int field) {
            this.field = field;
        }

        public Long getKey(T t) throws Exception {
            return (Long)t.getField(this.field);
        }
    }

    public static class StringPojoExtractor
    implements KeySelector<CollectionDataSets.CustomType, String> {
        private static final long serialVersionUID = 1L;

        public String getKey(CollectionDataSets.CustomType value) throws Exception {
            return value.myString;
        }
    }

    public static class TwoTuplePojoExtractor
    implements KeySelector<CollectionDataSets.CustomType, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Integer, Integer> getKey(CollectionDataSets.CustomType value) throws Exception {
            return new Tuple2((Object)value.myInt, (Object)value.myInt);
        }
    }

    public static class GroupReducer4
    implements GroupReduceFunction<Tuple3<Integer, CollectionDataSets.CrazyNested, CollectionDataSets.POJO>, Integer> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple3<Integer, CollectionDataSets.CrazyNested, CollectionDataSets.POJO>> values, Collector<Integer> out) {
            out.collect((Object)GroupReduceITCase.countElements(values));
        }
    }

    public static class GroupReducer3
    implements GroupReduceFunction<CollectionDataSets.PojoContainingTupleAndWritable, Integer> {
        public void reduce(Iterable<CollectionDataSets.PojoContainingTupleAndWritable> values, Collector<Integer> out) {
            out.collect((Object)GroupReduceITCase.countElements(values));
        }
    }

    public static class GroupReducer2
    implements GroupReduceFunction<CollectionDataSets.FromTupleWithCTor, Integer> {
        public void reduce(Iterable<CollectionDataSets.FromTupleWithCTor> values, Collector<Integer> out) {
            out.collect((Object)GroupReduceITCase.countElements(values));
        }
    }

    public static class GroupReducer1
    implements GroupReduceFunction<CollectionDataSets.CrazyNested, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<CollectionDataSets.CrazyNested> values, Collector<Tuple2<String, Integer>> out) throws Exception {
            int c = 0;
            String n = null;
            for (CollectionDataSets.CrazyNested v : values) {
                ++c;
                n = v.nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal;
            }
            out.collect((Object)new Tuple2(n, (Object)c));
        }
    }

    public static class KeySelector4
    implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
            return new Tuple2(t.f0, t.f4);
        }
    }

    public static class KeySelector3
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1L;

        public Integer getKey(CollectionDataSets.CustomType in) {
            return in.myInt;
        }
    }

    public static class KeySelector2
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1L;

        public Integer getKey(CollectionDataSets.CustomType in) {
            return in.myInt;
        }
    }

    public static class KeySelector1
    implements KeySelector<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1L;

        public Long getKey(Tuple3<Integer, Long, String> in) {
            return (Long)in.f1;
        }
    }

    public static class ByteArrayGroupReduce
    implements GroupReduceFunction<Tuple2<byte[], Integer>, Integer> {
        public void reduce(Iterable<Tuple2<byte[], Integer>> values, Collector<Integer> out) throws Exception {
            int sum = 0;
            for (Tuple2<byte[], Integer> value : values) {
                sum += ((Integer)value.f1).intValue();
            }
            out.collect((Object)sum);
        }
    }
}

