package org.apache.flink.optimizer.custompartition;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.class */
public class GroupingKeySelectorTranslationTest extends CompilerTestBase {

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest$TestBinaryKeySelector.class */
    private static class TestBinaryKeySelector<T extends Tuple> implements KeySelector<T, Tuple2<Integer, Integer>> {
        private TestBinaryKeySelector() {
        }

        public Tuple2<Integer, Integer> getKey(T t) {
            return new Tuple2<>(t.getField(0), t.getField(1));
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest$TestKeySelector.class */
    private static class TestKeySelector<T extends Tuple> implements KeySelector<T, Integer> {
        private TestKeySelector() {
        }

        public Integer getKey(T t) {
            return (Integer) t.getField(0);
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest$TestPartitionerInt.class */
    private static class TestPartitionerInt implements Partitioner<Integer> {
        private TestPartitionerInt() {
        }

        public int partition(Integer num, int i) {
            return 0;
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest$TestPartitionerLong.class */
    private static class TestPartitionerLong implements Partitioner<Long> {
        private TestPartitionerLong() {
        }

        public int partition(Long l, int i) {
            return 0;
        }
    }

    @Test
    public void testCustomPartitioningKeySelectorReduce() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0, 0)}).rebalance().setParallelism(4).groupBy(new TestKeySelector()).withPartitioner(new TestPartitionerInt()).reduce(new SelectOneReducer()).output(new DiscardingOutputFormat());
            SinkPlanNode sinkPlanNode = (SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next();
            SingleInputPlanNode source = sinkPlanNode.getInput().getSource();
            SingleInputPlanNode source2 = source.getInput().getSource();
            SingleInputPlanNode source3 = source2.getInput().getSource();
            Assert.assertEquals(ShipStrategyType.FORWARD, sinkPlanNode.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, source.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source2.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, source3.getInput().getShipStrategy());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCustomPartitioningKeySelectorGroupReduce() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0, 0)}).rebalance().setParallelism(4).groupBy(new TestKeySelector()).withPartitioner(new TestPartitionerInt()).reduceGroup(new IdentityGroupReducerCombinable()).output(new DiscardingOutputFormat());
            SinkPlanNode sinkPlanNode = (SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next();
            SingleInputPlanNode source = sinkPlanNode.getInput().getSource();
            SingleInputPlanNode source2 = source.getInput().getSource();
            Assert.assertEquals(ShipStrategyType.FORWARD, sinkPlanNode.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, source2.getInput().getShipStrategy());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCustomPartitioningKeySelectorGroupReduceSorted() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Tuple3[]{new Tuple3(0, 0, 0)}).rebalance().setParallelism(4).groupBy(new TestKeySelector()).withPartitioner(new TestPartitionerInt()).sortGroup(new TestKeySelector(), Order.ASCENDING).reduceGroup(new IdentityGroupReducerCombinable()).output(new DiscardingOutputFormat());
            SinkPlanNode sinkPlanNode = (SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next();
            SingleInputPlanNode source = sinkPlanNode.getInput().getSource();
            SingleInputPlanNode source2 = source.getInput().getSource();
            Assert.assertEquals(ShipStrategyType.FORWARD, sinkPlanNode.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source.getInput().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.FORWARD, source2.getInput().getShipStrategy());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCustomPartitioningKeySelectorInvalidType() {
        try {
            try {
                ExecutionEnvironment.getExecutionEnvironment().fromElements(new Tuple2[]{new Tuple2(0, 0)}).rebalance().setParallelism(4).groupBy(new TestKeySelector()).withPartitioner(new TestPartitionerLong());
                Assert.fail("Should throw an exception");
            } catch (InvalidProgramException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testCustomPartitioningKeySelectorInvalidTypeSorted() {
        try {
            try {
                ExecutionEnvironment.getExecutionEnvironment().fromElements(new Tuple3[]{new Tuple3(0, 0, 0)}).rebalance().setParallelism(4).groupBy(new TestKeySelector()).sortGroup(1, Order.ASCENDING).withPartitioner(new TestPartitionerLong());
                Assert.fail("Should throw an exception");
            } catch (InvalidProgramException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testCustomPartitioningTupleRejectCompositeKey() {
        try {
            try {
                ExecutionEnvironment.getExecutionEnvironment().fromElements(new Tuple3[]{new Tuple3(0, 0, 0)}).rebalance().setParallelism(4).groupBy(new TestBinaryKeySelector()).withPartitioner(new TestPartitionerInt());
                Assert.fail("Should throw an exception");
            } catch (InvalidProgramException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }
}
