package org.apache.flink.optimizer;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.class */
public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {

    /* loaded from: input_file:org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest$MockJoin.class */
    public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> tuple3, Tuple3<Integer, Integer, Integer> tuple32) throws Exception {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest$MockMapper.class */
    public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> tuple3) throws Exception {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest$MockReducer.class */
    public static class MockReducer implements ReduceFunction<Tuple3<Integer, Integer, Integer>> {
        public Tuple3<Integer, Integer, Integer> reduce(Tuple3<Integer, Integer, Integer> tuple3, Tuple3<Integer, Integer, Integer> tuple32) throws Exception {
            return null;
        }
    }

    @Test
    public void forwardFieldsTestMapReduce() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).map(new MockMapper()).withForwardedFields(new String[]{"*"}).groupBy(new int[]{0}).reduce(new MockReducer()).withForwardedFields(new String[]{"f0->f1"}).map(new MockMapper()).withForwardedFields(new String[]{"*"}).groupBy(new int[]{1}).reduce(new MockReducer()).withForwardedFields(new String[]{"*"}).print();
        compileWithStats(executionEnvironment.createProgramPlan()).accept(new Visitor<PlanNode>() { // from class: org.apache.flink.optimizer.SemanticPropertiesAPIToPlanTest.1
            public boolean preVisit(PlanNode planNode) {
                if ((planNode instanceof SingleInputPlanNode) && (planNode.getProgramOperator() instanceof ReduceOperatorBase)) {
                    for (Channel channel : planNode.getInputs()) {
                        GlobalProperties globalProperties = planNode.getGlobalProperties();
                        LocalProperties localProperties = planNode.getLocalProperties();
                        Assert.assertTrue("Reduce should just forward the input if it is already partitioned", channel.getShipStrategy() == ShipStrategyType.FORWARD);
                        Assert.assertTrue("Wrong GlobalProperties on Reducer", globalProperties.isPartitionedOnFields(new FieldSet(1)));
                        Assert.assertTrue("Wrong GlobalProperties on Reducer", globalProperties.getPartitioning() == PartitioningProperty.HASH_PARTITIONED);
                        Assert.assertTrue("Wrong LocalProperties on Reducer", localProperties.getGroupedFields().contains(1));
                    }
                }
                if (!(planNode instanceof SingleInputPlanNode) || !(planNode.getProgramOperator() instanceof MapOperatorBase)) {
                    return true;
                }
                for (Channel channel2 : planNode.getInputs()) {
                    GlobalProperties globalProperties2 = planNode.getGlobalProperties();
                    LocalProperties localProperties2 = planNode.getLocalProperties();
                    Assert.assertTrue("Map should just forward the input if it is already partitioned", channel2.getShipStrategy() == ShipStrategyType.FORWARD);
                    Assert.assertTrue("Wrong GlobalProperties on Mapper", globalProperties2.isPartitionedOnFields(new FieldSet(1)));
                    Assert.assertTrue("Wrong GlobalProperties on Mapper", globalProperties2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED);
                    Assert.assertTrue("Wrong LocalProperties on Mapper", localProperties2.getGroupedFields().contains(1));
                }
                return false;
            }

            public void postVisit(PlanNode planNode) {
            }
        });
    }

    @Test
    public void forwardFieldsTestJoin() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).map(new MockMapper()).withForwardedFields(new String[]{"*"}).groupBy(new int[]{0}).reduce(new MockReducer()).withForwardedFields(new String[]{"f0->f1"}).join(executionEnvironment.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class).map(new MockMapper()).withForwardedFields(new String[]{"*"}).groupBy(new int[]{1}).reduce(new MockReducer()).withForwardedFields(new String[]{"f1->f2"})).where(new int[]{1}).equalTo(new int[]{2}).with(new MockJoin()).print();
        compileWithStats(executionEnvironment.createProgramPlan()).accept(new Visitor<PlanNode>() { // from class: org.apache.flink.optimizer.SemanticPropertiesAPIToPlanTest.2
            public boolean preVisit(PlanNode planNode) {
                if (!(planNode instanceof DualInputPlanNode) || !(planNode.getProgramOperator() instanceof JoinOperatorBase)) {
                    return true;
                }
                DualInputPlanNode dualInputPlanNode = (DualInputPlanNode) planNode;
                Channel input1 = dualInputPlanNode.getInput1();
                Channel input2 = dualInputPlanNode.getInput2();
                Assert.assertTrue("Join should just forward the input if it is already partitioned", input1.getShipStrategy() == ShipStrategyType.FORWARD);
                Assert.assertTrue("Join should just forward the input if it is already partitioned", input2.getShipStrategy() == ShipStrategyType.FORWARD);
                return false;
            }

            public void postVisit(PlanNode planNode) {
            }
        });
    }
}
