/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.javaApiOperators;

import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class PartitionITCase
extends MultipleProgramsTestBase {
    public PartitionITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testHashPartitionByKeyField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ds.partitionByHash(new int[]{1}).mapPartition((MapPartitionFunction)new UniqueLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        PartitionITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testHashPartitionByKeySelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ds.partitionByHash((KeySelector)new KeySelector1()).mapPartition((MapPartitionFunction)new UniqueLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        PartitionITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testForcedRebalancing() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource ds = env.generateSequence(1L, 3000L);
        MapOperator uniqLongs = ds.filter((FilterFunction)new Filter1()).rebalance().map((MapFunction)new PartitionIndexMapper()).groupBy(new int[]{0}).reduce((ReduceFunction)new Reducer1()).map((MapFunction)new Mapper1());
        List result = uniqLongs.collect();
        StringBuilder expected = new StringBuilder();
        int numPerPartition = 2220 / env.getParallelism() / 10;
        for (int i = 0; i < env.getParallelism(); ++i) {
            expected.append('(').append(i).append(',').append(numPerPartition).append(")\n");
        }
        PartitionITCase.compareResultAsText((List)result, (String)expected.toString());
    }

    @Test
    public void testHashPartitionByKeyFieldAndDifferentParallelism() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ((PartitionOperator)ds.partitionByHash(new int[]{1}).setParallelism(4)).mapPartition((MapPartitionFunction)new UniqueLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        PartitionITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testHashPartitionWithKeyExpression() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
        MapPartitionOperator uniqLongs = ((PartitionOperator)ds.partitionByHash(new String[]{"nestedPojo.longNumber"}).setParallelism(4)).mapPartition((MapPartitionFunction)new UniqueNestedPojoLongMapper());
        List result = uniqLongs.collect();
        String expected = "10000\n20000\n30000\n";
        PartitionITCase.compareResultAsText((List)result, (String)expected);
    }

    public static class PartitionIndexMapper
    extends RichMapFunction<Long, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Integer, Integer> map(Long value) throws Exception {
            return new Tuple2((Object)this.getRuntimeContext().getIndexOfThisSubtask(), (Object)1);
        }
    }

    public static class UniqueNestedPojoLongMapper
    implements MapPartitionFunction<CollectionDataSets.POJO, Long> {
        private static final long serialVersionUID = 1L;

        public void mapPartition(Iterable<CollectionDataSets.POJO> records, Collector<Long> out) throws Exception {
            HashSet<Long> uniq = new HashSet<Long>();
            for (CollectionDataSets.POJO t : records) {
                uniq.add(t.nestedPojo.longNumber);
            }
            for (Long l : uniq) {
                out.collect((Object)l);
            }
        }
    }

    public static class UniqueLongMapper
    implements MapPartitionFunction<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1L;

        public void mapPartition(Iterable<Tuple3<Integer, Long, String>> records, Collector<Long> out) throws Exception {
            HashSet<Object> uniq = new HashSet<Object>();
            for (Tuple3<Integer, Long, String> tuple3 : records) {
                uniq.add(tuple3.f1);
            }
            for (Long l : uniq) {
                out.collect((Object)l);
            }
        }
    }

    public static class Mapper1
    implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
            value.f1 = (Integer)value.f1 / 10;
            return value;
        }
    }

    public static class Reducer1
    implements ReduceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) {
            return new Tuple2(v1.f0, (Object)((Integer)v1.f1 + (Integer)v2.f1));
        }
    }

    public static class Filter1
    implements FilterFunction<Long> {
        private static final long serialVersionUID = 1L;

        public boolean filter(Long value) throws Exception {
            return value > 780L;
        }
    }

    public static class KeySelector1
    implements KeySelector<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1L;

        public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
            return (Long)value.f1;
        }
    }
}

