/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.io.Serializable;
import java.util.Date;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ReduceITCase
extends MultipleProgramsTestBase {
    public ReduceITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testReduceOnTuplesWithKeyFieldSelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        ReduceOperator reduceDs = ds.groupBy(new int[]{1}).reduce((ReduceFunction)new Tuple3Reduce("B-)"));
        List result = reduceDs.collect();
        String expected = "1,1,Hi\n5,2,B-)\n15,3,B-)\n34,4,B-)\n65,5,B-)\n111,6,B-)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testReduceOnTupleWithMultipleKeyFieldSelectors() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        ReduceOperator reduceDs = ds.groupBy(new int[]{4, 0}).reduce((ReduceFunction & Serializable)(in1, in2) -> {
            Tuple5 out = new Tuple5();
            out.setFields(in1.f0, (Object)((Long)in1.f1 + (Long)in2.f1), (Object)0, (Object)"P-)", in1.f4);
            return out;
        });
        List result = reduceDs.collect();
        String expected = "1,1,0,Hallo,1\n2,3,2,Hallo Welt wie,1\n2,2,1,Hallo Welt,2\n3,9,0,P-),2\n3,6,5,BCD,3\n4,17,0,P-),1\n4,17,0,P-),2\n5,11,10,GHI,1\n5,29,0,P-),2\n5,25,0,P-),3\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testReduceOnTuplesWithKeyExtractor() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        ReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector1()).reduce((ReduceFunction)new Tuple3Reduce("B-)"));
        List result = reduceDs.collect();
        String expected = "1,1,Hi\n5,2,B-)\n15,3,B-)\n34,4,B-)\n65,5,B-)\n111,6,B-)\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testReduceOnCustomTypeWithKeyExtractor() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        ReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector2()).reduce((ReduceFunction)new CustomTypeReduce());
        List result = reduceDs.collect();
        String expected = "1,0,Hi\n2,3,Hello!\n3,12,Hello!\n4,30,Hello!\n5,60,Hello!\n6,105,Hello!\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testAllReduceForTuple() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        ReduceOperator reduceDs = ds.reduce((ReduceFunction)new AllAddingTuple3Reduce());
        List result = reduceDs.collect();
        String expected = "231,91,Hello World\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testAllReduceForCustomTypes() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        ReduceOperator reduceDs = ds.reduce((ReduceFunction)new AllAddingCustomTypeReduce());
        List result = reduceDs.collect();
        String expected = "91,210,Hello!";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testReduceWithBroadcastSet() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        SingleInputUdfOperator reduceDs = ds.groupBy(new int[]{1}).reduce((ReduceFunction)new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
        List result = reduceDs.collect();
        String expected = "1,1,Hi\n5,2,55\n15,3,55\n34,4,55\n65,5,55\n111,6,55\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testReduceATupleReturningKeySelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        ReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector3()).reduce((ReduceFunction)new Tuple5Reduce());
        List result = reduceDs.collect();
        String expected = "1,1,0,Hallo,1\n2,3,2,Hallo Welt wie,1\n2,2,1,Hallo Welt,2\n3,9,0,P-),2\n3,6,5,BCD,3\n4,17,0,P-),1\n4,17,0,P-),2\n5,11,10,GHI,1\n5,29,0,P-),2\n5,25,0,P-),3\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testReduceOnTupleWithMultipleKeyExpressions() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        ReduceOperator reduceDs = ds.groupBy(new String[]{"f4", "f0"}).reduce((ReduceFunction)new Tuple5Reduce());
        List result = reduceDs.collect();
        String expected = "1,1,0,Hallo,1\n2,3,2,Hallo Welt wie,1\n2,2,1,Hallo Welt,2\n3,9,0,P-),2\n3,6,5,BCD,3\n4,17,0,P-),1\n4,17,0,P-),2\n5,11,10,GHI,1\n5,29,0,P-),2\n5,25,0,P-),3\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testReduceOnTupleWithMultipleKeyExpressionsWithHashHint() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
        ReduceOperator reduceDs = ds.groupBy(new String[]{"f4", "f0"}).reduce((ReduceFunction)new Tuple5Reduce()).setCombineHint(ReduceOperatorBase.CombineHint.HASH);
        List result = reduceDs.collect();
        String expected = "1,1,0,Hallo,1\n2,3,2,Hallo Welt wie,1\n2,2,1,Hallo Welt,2\n3,9,0,P-),2\n3,6,5,BCD,3\n4,17,0,P-),1\n4,17,0,P-),2\n5,11,10,GHI,1\n5,29,0,P-),2\n5,25,0,P-),3\n";
        TestBaseUtils.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testSupportForDataAndEnumSerialization() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator ds = env.generateSequence(0L, 2L).map((MapFunction)new Mapper1());
        ds = ds.union(CollectionDataSets.getPojoWithDateAndEnum(env));
        GroupReduceOperator res = ds.groupBy(new String[]{"group"}).reduceGroup((GroupReduceFunction)new GroupReducer1());
        List result = res.collect();
        String expected = "ok\nok";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    private static class BCTuple3Reduce
    extends RichReduceFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private final Tuple3<Integer, Long, String> out = new Tuple3();
        private String f2Replace = "";

        private BCTuple3Reduce() {
        }

        public void open(OpenContext openContext) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            int sum = 0;
            for (Integer i : ints) {
                sum += i.intValue();
            }
            this.f2Replace = sum + "";
        }

        public Tuple3<Integer, Long, String> reduce(Tuple3<Integer, Long, String> in1, Tuple3<Integer, Long, String> in2) throws Exception {
            this.out.setFields((Object)((Integer)in1.f0 + (Integer)in2.f0), in1.f1, (Object)this.f2Replace);
            return this.out;
        }
    }

    private static class AllAddingCustomTypeReduce
    implements ReduceFunction<CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;
        private final CollectionDataSets.CustomType out = new CollectionDataSets.CustomType();

        private AllAddingCustomTypeReduce() {
        }

        public CollectionDataSets.CustomType reduce(CollectionDataSets.CustomType in1, CollectionDataSets.CustomType in2) throws Exception {
            this.out.myInt = in1.myInt + in2.myInt;
            this.out.myLong = in1.myLong + in2.myLong;
            this.out.myString = "Hello!";
            return this.out;
        }
    }

    private static class AllAddingTuple3Reduce
    implements ReduceFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private final Tuple3<Integer, Long, String> out = new Tuple3();

        private AllAddingTuple3Reduce() {
        }

        public Tuple3<Integer, Long, String> reduce(Tuple3<Integer, Long, String> in1, Tuple3<Integer, Long, String> in2) throws Exception {
            this.out.setFields((Object)((Integer)in1.f0 + (Integer)in2.f0), (Object)((Long)in1.f1 + (Long)in2.f1), (Object)"Hello World");
            return this.out;
        }
    }

    private static class CustomTypeReduce
    implements ReduceFunction<CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;
        private final CollectionDataSets.CustomType out = new CollectionDataSets.CustomType();

        private CustomTypeReduce() {
        }

        public CollectionDataSets.CustomType reduce(CollectionDataSets.CustomType in1, CollectionDataSets.CustomType in2) throws Exception {
            this.out.myInt = in1.myInt;
            this.out.myLong = in1.myLong + in2.myLong;
            this.out.myString = "Hello!";
            return this.out;
        }
    }

    private static class Tuple5Reduce
    implements ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1L;
        private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5();

        private Tuple5Reduce() {
        }

        public Tuple5<Integer, Long, Integer, String, Long> reduce(Tuple5<Integer, Long, Integer, String, Long> in1, Tuple5<Integer, Long, Integer, String, Long> in2) throws Exception {
            this.out.setFields(in1.f0, (Object)((Long)in1.f1 + (Long)in2.f1), (Object)0, (Object)"P-)", in1.f4);
            return this.out;
        }
    }

    private static class Tuple3Reduce
    implements ReduceFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private final Tuple3<Integer, Long, String> out = new Tuple3();
        private final String f2Replace;

        public Tuple3Reduce() {
            this.f2Replace = null;
        }

        public Tuple3Reduce(String f2Replace) {
            this.f2Replace = f2Replace;
        }

        public Tuple3<Integer, Long, String> reduce(Tuple3<Integer, Long, String> in1, Tuple3<Integer, Long, String> in2) throws Exception {
            if (this.f2Replace == null) {
                this.out.setFields((Object)((Integer)in1.f0 + (Integer)in2.f0), in1.f1, in1.f2);
            } else {
                this.out.setFields((Object)((Integer)in1.f0 + (Integer)in2.f0), in1.f1, (Object)this.f2Replace);
            }
            return this.out;
        }
    }

    private static class GroupReducer1
    implements GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String> {
        private static final long serialVersionUID = 1L;

        private GroupReducer1() {
        }

        public void reduce(Iterable<CollectionDataSets.PojoWithDateAndEnum> values, Collector<String> out) throws Exception {
            for (CollectionDataSets.PojoWithDateAndEnum val : values) {
                if (val.cat == CollectionDataSets.Category.CAT_A) {
                    Assert.assertEquals((Object)"a", (Object)val.group);
                } else if (val.cat == CollectionDataSets.Category.CAT_B) {
                    Assert.assertEquals((Object)"b", (Object)val.group);
                } else {
                    Assert.fail((String)("error. Cat = " + (Object)((Object)val.cat)));
                }
                Assert.assertEquals((long)666L, (long)val.date.getTime());
            }
            out.collect((Object)"ok");
        }
    }

    private static class Mapper1
    implements MapFunction<Long, CollectionDataSets.PojoWithDateAndEnum> {
        private Mapper1() {
        }

        public CollectionDataSets.PojoWithDateAndEnum map(Long value) throws Exception {
            int l = value.intValue();
            switch (l) {
                case 0: {
                    CollectionDataSets.PojoWithDateAndEnum one = new CollectionDataSets.PojoWithDateAndEnum();
                    one.group = "a";
                    one.date = new Date(666L);
                    one.cat = CollectionDataSets.Category.CAT_A;
                    return one;
                }
                case 1: {
                    CollectionDataSets.PojoWithDateAndEnum two = new CollectionDataSets.PojoWithDateAndEnum();
                    two.group = "a";
                    two.date = new Date(666L);
                    two.cat = CollectionDataSets.Category.CAT_A;
                    return two;
                }
                case 2: {
                    CollectionDataSets.PojoWithDateAndEnum three = new CollectionDataSets.PojoWithDateAndEnum();
                    three.group = "b";
                    three.date = new Date(666L);
                    three.cat = CollectionDataSets.Category.CAT_B;
                    return three;
                }
            }
            throw new RuntimeException("Unexpected value for l=" + l);
        }
    }

    private static class KeySelector3
    implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1L;

        private KeySelector3() {
        }

        public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
            return new Tuple2(t.f0, t.f4);
        }
    }

    private static class KeySelector2
    implements KeySelector<CollectionDataSets.CustomType, Integer> {
        private static final long serialVersionUID = 1L;

        private KeySelector2() {
        }

        public Integer getKey(CollectionDataSets.CustomType in) {
            return in.myInt;
        }
    }

    private static class KeySelector1
    implements KeySelector<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1L;

        private KeySelector1() {
        }

        public Long getKey(Tuple3<Integer, Long, String> in) {
            return (Long)in.f1;
        }
    }
}

