package org.apache.flink.test.operators;

import java.io.IOException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/operators/CustomDistributionITCase.class */
public class CustomDistributionITCase extends TestLogger {

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(8).build());

    /* loaded from: input_file:org/apache/flink/test/operators/CustomDistributionITCase$TestDataDist1.class */
    public static class TestDataDist1 implements DataDistribution {
        public Integer[][] boundaries = {new Integer[]{4}, new Integer[]{9}, new Integer[]{13}, new Integer[]{18}};

        public int getParallelism() {
            return this.boundaries.length;
        }

        public Object[] getBucketBoundary(int i, int i2) {
            return this.boundaries[i];
        }

        public int getNumberOfFields() {
            return 1;
        }

        public TypeInformation[] getKeyTypes() {
            return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
        }

        public void write(DataOutputView dataOutputView) throws IOException {
        }

        public void read(DataInputView dataInputView) throws IOException {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/operators/CustomDistributionITCase$TestDataDist2.class */
    public static class TestDataDist2 implements DataDistribution {
        public Integer[][] boundaries = {new Integer[]{1, 6}, new Integer[]{2, 4}, new Integer[]{3, 9}, new Integer[]{4, 1}, new Integer[]{5, 2}};

        public int getParallelism() {
            return this.boundaries.length;
        }

        public Object[] getBucketBoundary(int i, int i2) {
            return this.boundaries[i];
        }

        public int getNumberOfFields() {
            return 2;
        }

        public TypeInformation[] getKeyTypes() {
            return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
        }

        public void write(DataOutputView dataOutputView) throws IOException {
        }

        public void read(DataInputView dataInputView) throws IOException {
        }
    }

    @Test
    public void testPartitionWithDistribution1() throws Exception {
        final TestDataDist1 testDataDist1 = new TestDataDist1();
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(testDataDist1.getParallelism());
        DataSetUtils.partitionByRange(CollectionDataSets.get3TupleDataSet(executionEnvironment), testDataDist1, new int[]{0}).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() { // from class: org.apache.flink.test.operators.CustomDistributionITCase.1
            public void mapPartition(Iterable<Tuple3<Integer, Long, String>> iterable, Collector<Boolean> collector) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                for (Tuple3<Integer, Long, String> tuple3 : iterable) {
                    boolean z = true;
                    if (indexOfThisSubtask == 0) {
                        if (((Integer) tuple3.f0).compareTo(testDataDist1.boundaries[0][0]) > 0) {
                            z = false;
                        }
                    } else if (indexOfThisSubtask <= 0 || indexOfThisSubtask >= testDataDist1.getParallelism() - 1) {
                        if (((Integer) tuple3.f0).compareTo(testDataDist1.boundaries[indexOfThisSubtask - 1][0]) <= 0) {
                            z = false;
                        }
                    } else {
                        Integer[] numArr = testDataDist1.boundaries[indexOfThisSubtask - 1];
                        if (((Integer) tuple3.f0).compareTo(testDataDist1.boundaries[indexOfThisSubtask][0]) > 0 || ((Integer) tuple3.f0).compareTo(numArr[0]) <= 0) {
                            z = false;
                        }
                    }
                    if (!z) {
                        Assert.fail("Record was not correctly partitioned: " + tuple3.toString());
                    }
                }
            }
        }).output(new DiscardingOutputFormat());
        executionEnvironment.execute();
    }

    @Test
    public void testRangeWithDistribution2() throws Exception {
        final TestDataDist2 testDataDist2 = new TestDataDist2();
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(testDataDist2.getParallelism());
        DataSetUtils.partitionByRange(executionEnvironment.fromElements(new Tuple3[]{new Tuple3(1, 5, "Hi"), new Tuple3(1, 6, "Hi"), new Tuple3(1, 7, "Hi"), new Tuple3(1, 11, "Hello"), new Tuple3(2, 3, "World"), new Tuple3(2, 4, "World"), new Tuple3(2, 5, "World"), new Tuple3(2, 13, "Hello World"), new Tuple3(3, 8, "Say"), new Tuple3(4, 0, "Why"), new Tuple3(4, 2, "Java"), new Tuple3(4, 11, "Say Hello"), new Tuple3(5, 1, "Hi Java!"), new Tuple3(5, 2, "Hi Java?"), new Tuple3(5, 3, "Hi Java again")}), testDataDist2, new int[]{0, 1}).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() { // from class: org.apache.flink.test.operators.CustomDistributionITCase.2
            public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> iterable, Collector<Boolean> collector) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                boolean z = true;
                for (Tuple3<Integer, Integer, String> tuple3 : iterable) {
                    if (indexOfThisSubtask == 0) {
                        Integer[] numArr = testDataDist2.boundaries[0];
                        if (((Integer) tuple3.f0).compareTo(numArr[0]) > 0 || (((Integer) tuple3.f0).compareTo(numArr[0]) == 0 && ((Integer) tuple3.f1).compareTo(numArr[1]) > 0)) {
                            z = false;
                        }
                    } else if (indexOfThisSubtask <= 0 || indexOfThisSubtask >= testDataDist2.getParallelism() - 1) {
                        Integer[] numArr2 = testDataDist2.boundaries[indexOfThisSubtask - 1];
                        if (((Integer) tuple3.f0).compareTo(numArr2[0]) < 0 || (((Integer) tuple3.f0).compareTo(numArr2[0]) == 0 && ((Integer) tuple3.f1).compareTo(numArr2[1]) <= 0)) {
                            z = false;
                        }
                    } else {
                        Integer[] numArr3 = testDataDist2.boundaries[indexOfThisSubtask - 1];
                        Integer[] numArr4 = testDataDist2.boundaries[indexOfThisSubtask];
                        if (((Integer) tuple3.f0).compareTo(numArr4[0]) > 0 || ((((Integer) tuple3.f0).compareTo(numArr4[0]) == 0 && ((Integer) tuple3.f1).compareTo(numArr4[1]) > 0) || ((Integer) tuple3.f0).compareTo(numArr3[0]) < 0 || (((Integer) tuple3.f0).compareTo(numArr3[0]) == 0 && ((Integer) tuple3.f1).compareTo(numArr3[1]) <= 0))) {
                            z = false;
                        }
                    }
                    if (!z) {
                        Assert.fail("Record was not correctly partitioned: " + tuple3.toString());
                    }
                }
            }
        }).output(new DiscardingOutputFormat());
        executionEnvironment.execute();
    }

    @Test
    public void testPartitionKeyLessDistribution() throws Exception {
        final TestDataDist2 testDataDist2 = new TestDataDist2();
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(testDataDist2.getParallelism());
        DataSetUtils.partitionByRange(CollectionDataSets.get3TupleDataSet(executionEnvironment), testDataDist2, new int[]{0}).mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() { // from class: org.apache.flink.test.operators.CustomDistributionITCase.3
            public void mapPartition(Iterable<Tuple3<Integer, Long, String>> iterable, Collector<Boolean> collector) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                for (Tuple3<Integer, Long, String> tuple3 : iterable) {
                    boolean z = true;
                    if (indexOfThisSubtask == 0) {
                        if (((Integer) tuple3.f0).compareTo(testDataDist2.boundaries[0][0]) > 0) {
                            z = false;
                        }
                    } else if (indexOfThisSubtask <= 0 || indexOfThisSubtask >= testDataDist2.getParallelism() - 1) {
                        if (((Integer) tuple3.f0).compareTo(testDataDist2.boundaries[indexOfThisSubtask - 1][0]) <= 0) {
                            z = false;
                        }
                    } else {
                        Integer[] numArr = testDataDist2.boundaries[indexOfThisSubtask - 1];
                        if (((Integer) tuple3.f0).compareTo(testDataDist2.boundaries[indexOfThisSubtask][0]) > 0 || ((Integer) tuple3.f0).compareTo(numArr[0]) <= 0) {
                            z = false;
                        }
                    }
                    if (!z) {
                        Assert.fail("Record was not correctly partitioned: " + tuple3.toString());
                    }
                }
            }
        }).output(new DiscardingOutputFormat());
        executionEnvironment.execute();
    }

    @Test(expected = IllegalArgumentException.class)
    public void testPartitionMoreThanDistribution() throws Exception {
        DataSetUtils.partitionByRange(CollectionDataSets.get3TupleDataSet(ExecutionEnvironment.getExecutionEnvironment()), new TestDataDist2(), new int[]{0, 1, 2});
    }
}
