package org.apache.flink.optimizer;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/PipelineBreakerTest.class */
public class PipelineBreakerTest extends CompilerTestBase {
    @Test
    public void testPipelineBreakerWithBroadcastVariable() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(64);
            MapOperator map = executionEnvironment.generateSequence(1L, 10L).map(new IdentityMapper());
            map.map(new IdentityMapper()).map(new IdentityMapper()).withBroadcastSet(map, "bc").print();
            Assert.assertTrue(((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource().getInput().getTempMode().breaksPipeline());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPipelineBreakerBroadcastedAllReduce() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(64);
            MapOperator map = executionEnvironment.generateSequence(1L, 10L).map(new IdentityMapper());
            ReduceOperator reduce = map.map(new IdentityMapper()).reduce(new SelectOneReducer());
            map.map(new IdentityMapper()).withBroadcastSet(reduce, "bc1").withBroadcastSet(executionEnvironment.generateSequence(1L, 10L), "bc2").print();
            Assert.assertTrue(((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource().getInput().getTempMode().breaksPipeline());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPipelineBreakerBroadcastedPartialSolution() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(64);
            IterativeDataSet iterate = executionEnvironment.generateSequence(1L, 10L).iterate(100);
            MapOperator map = executionEnvironment.generateSequence(1L, 10L).map(new IdentityMapper());
            iterate.closeWith(map.map(new IdentityMapper()).withBroadcastSet(iterate, "bc2").withBroadcastSet(map.map(new IdentityMapper()).reduce(new SelectOneReducer()), "bc1")).print();
            Assert.assertTrue(((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource().getRootOfStepFunction().getInput().getTempMode().breaksPipeline());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPilelineBreakerWithCross() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(64);
            DataSource generateSequence = executionEnvironment.generateSequence(1L, 10L);
            Configuration configuration = new Configuration();
            configuration.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST");
            generateSequence.map(new IdentityMapper()).cross(generateSequence).withParameters(configuration).print();
            Assert.assertTrue(((SinkPlanNode) compileNoStats(executionEnvironment.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource().getInput1().getTempMode().breaksPipeline());
            ExecutionEnvironment executionEnvironment2 = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment2.setParallelism(64);
            DataSource generateSequence2 = executionEnvironment2.generateSequence(1L, 10L);
            Configuration configuration2 = new Configuration();
            configuration2.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND");
            generateSequence2.map(new IdentityMapper()).cross(generateSequence2).withParameters(configuration2).print();
            Assert.assertTrue(((SinkPlanNode) compileNoStats(executionEnvironment2.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource().getInput2().getTempMode().breaksPipeline());
            ExecutionEnvironment executionEnvironment3 = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment3.setParallelism(64);
            DataSource generateSequence3 = executionEnvironment3.generateSequence(1L, 10L);
            Configuration configuration3 = new Configuration();
            configuration3.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST");
            generateSequence3.map(new IdentityMapper()).cross(generateSequence3).withParameters(configuration3).print();
            Assert.assertTrue(((SinkPlanNode) compileNoStats(executionEnvironment3.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource().getInput1().getTempMode().breaksPipeline());
            ExecutionEnvironment executionEnvironment4 = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment4.setParallelism(64);
            DataSource generateSequence4 = executionEnvironment4.generateSequence(1L, 10L);
            Configuration configuration4 = new Configuration();
            configuration4.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND");
            generateSequence4.map(new IdentityMapper()).cross(generateSequence4).withParameters(configuration4).print();
            Assert.assertTrue(((SinkPlanNode) compileNoStats(executionEnvironment4.createProgramPlan()).getDataSinks().iterator().next()).getInput().getSource().getInput2().getTempMode().breaksPipeline());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
