/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

public class ParallelismChangeTest
extends CompilerTestBase {
    @Test
    public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
        int p = 8;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        Operator set1 = env.generateSequence(0L, 1L).setParallelism(8);
        ((GroupReduceOperator)((GroupReduceOperator)((GroupReduceOperator)((MapOperator)((MapOperator)((MapOperator)((GroupReduceOperator)((GroupReduceOperator)((GroupReduceOperator)((MapOperator)((MapOperator)((MapOperator)set1.map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).setParallelism(8)).name("Map1")).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(8)).name("Reduce1")).map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).setParallelism(16)).name("Map2")).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(16)).name("Reduce2")).output((OutputFormat)new DiscardingOutputFormat()).setParallelism(16).name("Sink");
        JavaPlan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        SingleInputPlanNode red2Node = (SingleInputPlanNode)sinkNode.getPredecessor();
        SingleInputPlanNode map2Node = (SingleInputPlanNode)red2Node.getPredecessor();
        ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
        ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.PARTITION_HASH, (Object)mapIn);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)redIn);
    }

    @Test
    public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
        int p = 8;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        Operator set1 = env.generateSequence(0L, 1L).setParallelism(8);
        ((GroupReduceOperator)((GroupReduceOperator)((GroupReduceOperator)((MapOperator)((MapOperator)((MapOperator)((GroupReduceOperator)((GroupReduceOperator)((GroupReduceOperator)((MapOperator)((MapOperator)((MapOperator)set1.map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).setParallelism(8)).name("Map1")).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(8)).name("Reduce1")).map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).setParallelism(8)).name("Map2")).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(16)).name("Reduce2")).output((OutputFormat)new DiscardingOutputFormat()).setParallelism(16).name("Sink");
        JavaPlan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        SingleInputPlanNode red2Node = (SingleInputPlanNode)sinkNode.getPredecessor();
        SingleInputPlanNode map2Node = (SingleInputPlanNode)red2Node.getPredecessor();
        ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
        ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.FORWARD, (Object)mapIn);
        Assert.assertEquals((String)"Invalid ship strategy for an operator.", (Object)ShipStrategyType.PARTITION_HASH, (Object)reduceIn);
    }

    @Test
    public void checkPropertyHandlingWithIncreasingLocalParallelism() {
        int p = 16;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(16);
        Operator set1 = env.generateSequence(0L, 1L).setParallelism(16);
        ((GroupReduceOperator)((GroupReduceOperator)((GroupReduceOperator)((MapOperator)((MapOperator)((MapOperator)((GroupReduceOperator)((GroupReduceOperator)((GroupReduceOperator)((MapOperator)((MapOperator)((MapOperator)set1.map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).setParallelism(16)).name("Map1")).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(16)).name("Reduce1")).map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).setParallelism(32)).name("Map2")).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(32)).name("Reduce2")).output((OutputFormat)new DiscardingOutputFormat()).setParallelism(32).name("Sink");
        JavaPlan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        SingleInputPlanNode red2Node = (SingleInputPlanNode)sinkNode.getPredecessor();
        SingleInputPlanNode map2Node = (SingleInputPlanNode)red2Node.getPredecessor();
        ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
        ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
        Assert.assertTrue((String)"Invalid ship strategy for an operator.", (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn || ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn ? 1 : 0) != 0);
    }

    @Test
    public void checkPropertyHandlingWithDecreasingParallelism() {
        int p = 8;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        ((GroupReduceOperator)((GroupReduceOperator)((GroupReduceOperator)((MapOperator)((MapOperator)((MapOperator)((GroupReduceOperator)((GroupReduceOperator)((GroupReduceOperator)((MapOperator)((MapOperator)((MapOperator)((DataSource)env.generateSequence(0L, 1L).setParallelism(16)).map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).setParallelism(16)).name("Map1")).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(16)).name("Reduce1")).map(new IdentityMapper()).withForwardedFields(new String[]{"*"})).setParallelism(8)).name("Map2")).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(8)).name("Reduce2")).output((OutputFormat)new DiscardingOutputFormat()).setParallelism(8).name("Sink");
        JavaPlan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
        SinkPlanNode sinkNode = (SinkPlanNode)oPlan.getDataSinks().iterator().next();
        SingleInputPlanNode red2Node = (SingleInputPlanNode)sinkNode.getPredecessor();
        SingleInputPlanNode map2Node = (SingleInputPlanNode)red2Node.getPredecessor();
        Assert.assertTrue((String)"The no sorting local strategy.", (LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() || LocalStrategy.SORT == map2Node.getInput().getLocalStrategy() ? 1 : 0) != 0);
        Assert.assertTrue((String)"The no partitioning ship strategy.", (ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() || ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy() ? 1 : 0) != 0);
    }

    @Test
    public void checkPropertyHandlingWithTwoInputs() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        Operator set1 = env.generateSequence(0L, 1L).setParallelism(5);
        Operator set2 = env.generateSequence(0L, 1L).setParallelism(7);
        Operator reduce1 = ((GroupReduceOperator)set1.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(5);
        Operator reduce2 = ((GroupReduceOperator)set2.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).withForwardedFields(new String[]{"*"})).setParallelism(7);
        ((JoinOperator)reduce1.join((DataSet)reduce2).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityJoiner()).setParallelism(5)).output((OutputFormat)new DiscardingOutputFormat()).setParallelism(5);
        JavaPlan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
        JobGraphGenerator jobGen = new JobGraphGenerator();
        jobGen.compileJobGraph(oPlan);
        oPlan.accept((Visitor)new Visitor<PlanNode>(){

            public boolean preVisit(PlanNode visitable) {
                if (visitable instanceof DualInputPlanNode) {
                    DualInputPlanNode node = (DualInputPlanNode)visitable;
                    Channel c1 = node.getInput1();
                    Channel c2 = node.getInput2();
                    Assert.assertEquals((String)"Incompatible shipping strategy chosen for match", (Object)ShipStrategyType.FORWARD, (Object)c1.getShipStrategy());
                    Assert.assertEquals((String)"Incompatible shipping strategy chosen for match", (Object)ShipStrategyType.PARTITION_HASH, (Object)c2.getShipStrategy());
                    return false;
                }
                return true;
            }

            public void postVisit(PlanNode visitable) {
            }
        });
    }
}

