package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyMatchStub;
import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.optimizer.util.IdentityMap;
import org.apache.flink.optimizer.util.IdentityReduce;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/ParallelismChangeTest.class */
public class ParallelismChangeTest extends CompilerTestBase {
    @Test
    public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
        FileDataSource fileDataSource = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
        fileDataSource.setParallelism(8);
        MapOperator build = MapOperator.builder(new IdentityMap()).name("Map1").build();
        build.setParallelism(8);
        build.setInput(fileDataSource);
        ReduceOperator build2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
        build2.setParallelism(8);
        build2.setInput(build);
        MapOperator build3 = MapOperator.builder(new IdentityMap()).name("Map2").build();
        build3.setParallelism(16);
        build3.setInput(build2);
        ReduceOperator build4 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
        build4.setParallelism(16);
        build4.setInput(build3);
        FileDataSink fileDataSink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
        fileDataSink.setParallelism(16);
        fileDataSink.setInput(build4);
        SingleInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(new Plan(fileDataSink, "Test Increasing parallelism")).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getPredecessor().getInput().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy2);
    }

    @Test
    public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
        FileDataSource fileDataSource = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
        fileDataSource.setParallelism(8);
        MapOperator build = MapOperator.builder(new IdentityMap()).name("Map1").build();
        build.setParallelism(8);
        build.setInput(fileDataSource);
        ReduceOperator build2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
        build2.setParallelism(8);
        build2.setInput(build);
        MapOperator build3 = MapOperator.builder(new IdentityMap()).name("Map2").build();
        build3.setParallelism(8);
        build3.setInput(build2);
        ReduceOperator build4 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
        build4.setParallelism(16);
        build4.setInput(build3);
        FileDataSink fileDataSink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
        fileDataSink.setParallelism(16);
        fileDataSink.setInput(build4);
        SingleInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(new Plan(fileDataSink, "Test Increasing parallelism")).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getPredecessor().getInput().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput().getShipStrategy();
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, shipStrategy);
        Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, shipStrategy2);
    }

    @Test
    public void checkPropertyHandlingWithIncreasingLocalParallelism() {
        FileDataSource fileDataSource = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
        fileDataSource.setParallelism(16);
        MapOperator build = MapOperator.builder(new IdentityMap()).name("Map1").build();
        build.setParallelism(16);
        build.setInput(fileDataSource);
        ReduceOperator build2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
        build2.setParallelism(16);
        build2.setInput(build);
        MapOperator build3 = MapOperator.builder(new IdentityMap()).name("Map2").build();
        build3.setParallelism(32);
        build3.setInput(build2);
        ReduceOperator build4 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
        build4.setParallelism(32);
        build4.setInput(build3);
        FileDataSink fileDataSink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
        fileDataSink.setParallelism(32);
        fileDataSink.setInput(build4);
        SingleInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(new Plan(fileDataSink, "Test Increasing parallelism")).getDataSinks().iterator().next()).getPredecessor();
        ShipStrategyType shipStrategy = predecessor.getPredecessor().getInput().getShipStrategy();
        ShipStrategyType shipStrategy2 = predecessor.getInput().getShipStrategy();
        Assert.assertTrue("Invalid ship strategy for an operator.", (ShipStrategyType.PARTITION_RANDOM == shipStrategy && ShipStrategyType.PARTITION_HASH == shipStrategy2) || (ShipStrategyType.PARTITION_HASH == shipStrategy && ShipStrategyType.FORWARD == shipStrategy2));
    }

    @Test
    public void checkPropertyHandlingWithDecreasingParallelism() {
        FileDataSource fileDataSource = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
        fileDataSource.setParallelism(16);
        MapOperator build = MapOperator.builder(new IdentityMap()).name("Map1").build();
        build.setParallelism(16);
        build.setInput(fileDataSource);
        ReduceOperator build2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
        build2.setParallelism(16);
        build2.setInput(build);
        MapOperator build3 = MapOperator.builder(new IdentityMap()).name("Map2").build();
        build3.setParallelism(8);
        build3.setInput(build2);
        ReduceOperator build4 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
        build4.setParallelism(8);
        build4.setInput(build3);
        FileDataSink fileDataSink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
        fileDataSink.setParallelism(8);
        fileDataSink.setInput(build4);
        SingleInputPlanNode predecessor = ((SinkPlanNode) compileNoStats(new Plan(fileDataSink, "Test Increasing parallelism")).getDataSinks().iterator().next()).getPredecessor();
        SingleInputPlanNode predecessor2 = predecessor.getPredecessor();
        Assert.assertTrue("The no sorting local strategy.", LocalStrategy.SORT == predecessor.getInput().getLocalStrategy() || LocalStrategy.SORT == predecessor2.getInput().getLocalStrategy());
        Assert.assertTrue("The no partitioning ship strategy.", ShipStrategyType.PARTITION_HASH == predecessor.getInput().getShipStrategy() || ShipStrategyType.PARTITION_HASH == predecessor2.getInput().getShipStrategy());
    }

    @Test
    public void checkPropertyHandlingWithTwoInputs() {
        FileDataSource fileDataSource = new FileDataSource(new DummyInputFormat(), IN_FILE);
        FileDataSource fileDataSource2 = new FileDataSource(new DummyInputFormat(), IN_FILE);
        ReduceOperator build = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).input(fileDataSource).build();
        ReduceOperator build2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).input(fileDataSource2).build();
        JoinOperator build3 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0).input1(build).input2(build2).build();
        FileDataSink fileDataSink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, build3);
        fileDataSource.setParallelism(5);
        fileDataSource2.setParallelism(7);
        build.setParallelism(5);
        build2.setParallelism(7);
        build3.setParallelism(5);
        fileDataSink.setParallelism(5);
        OptimizedPlan compileNoStats = compileNoStats(new Plan(fileDataSink, "Partition on DoP Change"));
        new JobGraphGenerator().compileJobGraph(compileNoStats);
        compileNoStats.accept(new Visitor<PlanNode>() { // from class: org.apache.flink.optimizer.ParallelismChangeTest.1
            public boolean preVisit(PlanNode planNode) {
                if (!(planNode instanceof DualInputPlanNode)) {
                    return true;
                }
                DualInputPlanNode dualInputPlanNode = (DualInputPlanNode) planNode;
                Channel input1 = dualInputPlanNode.getInput1();
                Channel input2 = dualInputPlanNode.getInput2();
                Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.FORWARD, input1.getShipStrategy());
                Assert.assertEquals("Incompatible shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, input2.getShipStrategy());
                return false;
            }

            public void postVisit(PlanNode planNode) {
            }
        });
    }
}
