package org.apache.flink.optimizer.custompartition;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.class */
public class JoinCustomPartitioningTest extends CompilerTestBase {

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest$Pojo2.class */
    public static class Pojo2 {
        public int a;
        public int b;
    }

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest$Pojo2KeySelector.class */
    private static class Pojo2KeySelector implements KeySelector<Pojo2, Integer> {
        private Pojo2KeySelector() {
        }

        public Integer getKey(Pojo2 pojo2) {
            return Integer.valueOf(pojo2.a);
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest$Pojo3.class */
    public static class Pojo3 {
        public int a;
        public int b;
        public int c;
    }

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest$Pojo3KeySelector.class */
    private static class Pojo3KeySelector implements KeySelector<Pojo3, Integer> {
        private Pojo3KeySelector() {
        }

        public Integer getKey(Pojo3 pojo3) {
            return Integer.valueOf(pojo3.b);
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest$TestPartitionerInt.class */
    private static class TestPartitionerInt implements Partitioner<Integer> {
        private TestPartitionerInt() {
        }

        public int partition(Integer num, int i) {
            return 0;
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest$TestPartitionerLong.class */
    private static class TestPartitionerLong implements Partitioner<Long> {
        private TestPartitionerLong() {
        }

        public int partition(Long l, int i) {
            return 0;
        }
    }

    @Test
    public void testJoinWithTuples() {
        try {
            TestPartitionerLong testPartitionerLong = new TestPartitionerLong();
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)}).join(executionEnvironment.fromElements(new Tuple3[]{new Tuple3(0L, 0L, 0L)}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{1}).equalTo(new int[]{0}).withPartitioner(testPartitionerLong).print();
            DualInputPlanNode source = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource();
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source.getInput2().getShipStrategy());
            Assert.assertEquals(testPartitionerLong, source.getInput1().getPartitioner());
            Assert.assertEquals(testPartitionerLong, source.getInput2().getPartitioner());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testJoinWithTuplesWrongType() {
        try {
            TestPartitionerInt testPartitionerInt = new TestPartitionerInt();
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            try {
                executionEnvironment.fromElements(new Tuple2[]{new Tuple2(0L, 0L)}).join(executionEnvironment.fromElements(new Tuple3[]{new Tuple3(0L, 0L, 0L)}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{1}).equalTo(new int[]{0}).withPartitioner(testPartitionerInt);
                Assert.fail("should throw an exception");
            } catch (InvalidProgramException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testJoinWithPojos() {
        try {
            TestPartitionerInt testPartitionerInt = new TestPartitionerInt();
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Pojo2[]{new Pojo2()}).join(executionEnvironment.fromElements(new Pojo3[]{new Pojo3()}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new String[]{"b"}).equalTo(new String[]{"a"}).withPartitioner(testPartitionerInt).print();
            DualInputPlanNode source = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource();
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source.getInput2().getShipStrategy());
            Assert.assertEquals(testPartitionerInt, source.getInput1().getPartitioner());
            Assert.assertEquals(testPartitionerInt, source.getInput2().getPartitioner());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testJoinWithPojosWrongType() {
        try {
            TestPartitionerLong testPartitionerLong = new TestPartitionerLong();
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            try {
                executionEnvironment.fromElements(new Pojo2[]{new Pojo2()}).join(executionEnvironment.fromElements(new Pojo3[]{new Pojo3()}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new String[]{"a"}).equalTo(new String[]{"b"}).withPartitioner(testPartitionerLong);
                Assert.fail("should throw an exception");
            } catch (InvalidProgramException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testJoinWithKeySelectors() {
        try {
            TestPartitionerInt testPartitionerInt = new TestPartitionerInt();
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.fromElements(new Pojo2[]{new Pojo2()}).join(executionEnvironment.fromElements(new Pojo3[]{new Pojo3()}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()).withPartitioner(testPartitionerInt).print();
            DualInputPlanNode source = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource();
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source.getInput1().getShipStrategy());
            Assert.assertEquals(ShipStrategyType.PARTITION_CUSTOM, source.getInput2().getShipStrategy());
            Assert.assertEquals(testPartitionerInt, source.getInput1().getPartitioner());
            Assert.assertEquals(testPartitionerInt, source.getInput2().getPartitioner());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testJoinWithKeySelectorsWrongType() {
        try {
            TestPartitionerLong testPartitionerLong = new TestPartitionerLong();
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            try {
                executionEnvironment.fromElements(new Pojo2[]{new Pojo2()}).join(executionEnvironment.fromElements(new Pojo3[]{new Pojo3()}), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()).withPartitioner(testPartitionerLong);
                Assert.fail("should throw an exception");
            } catch (InvalidProgramException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testIncompatibleHashAndCustomPartitioning() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            SingleInputUdfOperator withForwardedFields = executionEnvironment.fromElements(new Tuple3[]{new Tuple3(0L, 0L, 0L)}).partitionCustom(new Partitioner<Long>() { // from class: org.apache.flink.optimizer.custompartition.JoinCustomPartitioningTest.1
                public int partition(Long l, int i) {
                    return 0;
                }
            }, 0).map(new IdentityMapper()).withForwardedFields(new String[]{"0", "1", "2"});
            withForwardedFields.distinct(new int[]{0, 1}).groupBy(new int[]{1}).sortGroup(0, Order.ASCENDING).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"0", "1"}).join(withForwardedFields, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction()).print();
            DualInputPlanNode source = ((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource();
            Assert.assertEquals(ShipStrategyType.PARTITION_HASH, source.getInput1().getShipStrategy());
            Assert.assertTrue(source.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || source.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
