package org.apache.flink.test.javaApiOperators;

import java.util.HashSet;
import java.util.Iterator;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/javaApiOperators/PartitionITCase.class */
public class PartitionITCase extends MultipleProgramsTestBase {
    private String resultPath;
    private String expected;

    @Rule
    public TemporaryFolder tempFolder;

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/PartitionITCase$Filter1.class */
    public static class Filter1 implements FilterFunction<Long> {
        private static final long serialVersionUID = 1;

        public boolean filter(Long l) throws Exception {
            return l.longValue() > 780;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/PartitionITCase$KeySelector1.class */
    public static class KeySelector1 implements KeySelector<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1;

        public Long getKey(Tuple3<Integer, Long, String> tuple3) throws Exception {
            return (Long) tuple3.f1;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/PartitionITCase$Mapper1.class */
    public static class Mapper1 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> tuple2) throws Exception {
            tuple2.f1 = Integer.valueOf(((Integer) tuple2.f1).intValue() / 10);
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/PartitionITCase$PartitionIndexMapper.class */
    public static class PartitionIndexMapper extends RichMapFunction<Long, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;

        public Tuple2<Integer, Integer> map(Long l) throws Exception {
            return new Tuple2<>(Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), 1);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/PartitionITCase$Reducer1.class */
    public static class Reducer1 implements ReduceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1;

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22) {
            return new Tuple2<>(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/PartitionITCase$UniqueLongMapper.class */
    public static class UniqueLongMapper implements MapPartitionFunction<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1;

        public void mapPartition(Iterable<Tuple3<Integer, Long, String>> iterable, Collector<Long> collector) throws Exception {
            HashSet hashSet = new HashSet();
            Iterator<Tuple3<Integer, Long, String>> it = iterable.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().f1);
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                collector.collect((Long) it2.next());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/PartitionITCase$UniqueNestedPojoLongMapper.class */
    public static class UniqueNestedPojoLongMapper implements MapPartitionFunction<CollectionDataSets.POJO, Long> {
        private static final long serialVersionUID = 1;

        public void mapPartition(Iterable<CollectionDataSets.POJO> iterable, Collector<Long> collector) throws Exception {
            HashSet hashSet = new HashSet();
            Iterator<CollectionDataSets.POJO> it = iterable.iterator();
            while (it.hasNext()) {
                hashSet.add(Long.valueOf(it.next().nestedPojo.longNumber));
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                collector.collect((Long) it2.next());
            }
        }
    }

    public PartitionITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
        this.tempFolder = new TemporaryFolder();
    }

    @Before
    public void before() throws Exception {
        this.resultPath = this.tempFolder.newFile().toURI().toString();
    }

    @After
    public void after() throws Exception {
        compareResultsByLinesInMemory(this.expected, this.resultPath);
    }

    @Test
    public void testHashPartitionByKeyField() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        CollectionDataSets.get3TupleDataSet(executionEnvironment).partitionByHash(new int[]{1}).mapPartition(new UniqueLongMapper()).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "1\n2\n3\n4\n5\n6\n";
    }

    @Test
    public void testHashPartitionByKeySelector() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        CollectionDataSets.get3TupleDataSet(executionEnvironment).partitionByHash(new KeySelector1()).mapPartition(new UniqueLongMapper()).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "1\n2\n3\n4\n5\n6\n";
    }

    @Test
    public void testForcedRebalancing() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.generateSequence(1L, 3000L).filter(new Filter1()).rebalance().map(new PartitionIndexMapper()).groupBy(new int[]{0}).reduce(new Reducer1()).map(new Mapper1()).writeAsText(this.resultPath);
        executionEnvironment.execute();
        StringBuilder sb = new StringBuilder();
        int parallelism = (2220 / executionEnvironment.getParallelism()) / 10;
        for (int i = 0; i < executionEnvironment.getParallelism(); i++) {
            sb.append('(').append(i).append(',').append(parallelism).append(")\n");
        }
        this.expected = sb.toString();
    }

    @Test
    public void testHashPartitionByKeyFieldAndDifferentParallelism() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        CollectionDataSets.get3TupleDataSet(executionEnvironment).partitionByHash(new int[]{1}).setParallelism(4).mapPartition(new UniqueLongMapper()).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "1\n2\n3\n4\n5\n6\n";
    }

    @Test
    public void testHashPartitionWithKeyExpression() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(3);
        CollectionDataSets.getDuplicatePojoDataSet(executionEnvironment).partitionByHash(new String[]{"nestedPojo.longNumber"}).setParallelism(4).mapPartition(new UniqueNestedPojoLongMapper()).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expected = "10000\n20000\n30000\n";
    }
}
