/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.NamedChannel;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.junit.Assert;
import org.junit.Test;

public class PipelineBreakerTest
extends CompilerTestBase {
    @Test
    public void testPipelineBreakerWithBroadcastVariable() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
            env.setParallelism(64);
            MapOperator source = env.generateSequence(1L, 10L).map(new IdentityMapper());
            SingleInputUdfOperator result = source.map(new IdentityMapper()).map(new IdentityMapper()).withBroadcastSet((DataSet)source, "bc");
            result.output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats((Plan)p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            SingleInputPlanNode mapper = (SingleInputPlanNode)sink.getInput().getSource();
            SingleInputPlanNode mapperInput = (SingleInputPlanNode)mapper.getInput().getSource();
            Assert.assertEquals((Object)TempMode.NONE, (Object)mapper.getInput().getTempMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)((NamedChannel)mapper.getBroadcastInputs().get(0)).getTempMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)mapperInput.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)((NamedChannel)mapper.getBroadcastInputs().get(0)).getDataExchangeMode());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPipelineBreakerBroadcastedAllReduce() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
            env.setParallelism(64);
            MapOperator sourceWithMapper = env.generateSequence(1L, 10L).map(new IdentityMapper());
            ReduceOperator bcInput1 = sourceWithMapper.map(new IdentityMapper()).reduce(new SelectOneReducer());
            DataSource bcInput2 = env.generateSequence(1L, 10L);
            SingleInputUdfOperator result = ((MapOperator)sourceWithMapper.map(new IdentityMapper()).withBroadcastSet((DataSet)bcInput1, "bc1")).withBroadcastSet((DataSet)bcInput2, "bc2");
            result.output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats((Plan)p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            SingleInputPlanNode mapper = (SingleInputPlanNode)sink.getInput().getSource();
            Assert.assertEquals((Object)TempMode.NONE, (Object)mapper.getInput().getTempMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)mapper.getInput().getDataExchangeMode());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPipelineBreakerBroadcastedPartialSolution() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
            env.setParallelism(64);
            DataSource initialSource = env.generateSequence(1L, 10L);
            IterativeDataSet iteration = initialSource.iterate(100);
            MapOperator sourceWithMapper = env.generateSequence(1L, 10L).map(new IdentityMapper());
            ReduceOperator bcInput1 = sourceWithMapper.map(new IdentityMapper()).reduce(new SelectOneReducer());
            SingleInputUdfOperator result = ((MapOperator)sourceWithMapper.map(new IdentityMapper()).withBroadcastSet((DataSet)iteration, "bc2")).withBroadcastSet((DataSet)bcInput1, "bc1");
            iteration.closeWith((DataSet)result).output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats((Plan)p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode)sink.getInput().getSource();
            SingleInputPlanNode mapper = (SingleInputPlanNode)iterationPlanNode.getRootOfStepFunction();
            Assert.assertEquals((Object)TempMode.CACHED, (Object)mapper.getInput().getTempMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)mapper.getInput().getDataExchangeMode());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPipelineBreakerWithCross() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(64);
            DataSource initialSource = env.generateSequence(1L, 10L);
            Configuration conf = new Configuration();
            conf.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST");
            ((CrossOperator)initialSource.map(new IdentityMapper()).cross((DataSet)initialSource).withParameters(conf)).output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats((Plan)p);
            SinkPlanNode sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            DualInputPlanNode cross = (DualInputPlanNode)sink.getInput().getSource();
            SingleInputPlanNode mapper = (SingleInputPlanNode)cross.getInput1().getSource();
            Assert.assertEquals((Object)TempMode.NONE, (Object)mapper.getInput().getTempMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)cross.getInput2().getTempMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)mapper.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)cross.getInput2().getDataExchangeMode());
            env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(64);
            initialSource = env.generateSequence(1L, 10L);
            conf = new Configuration();
            conf.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND");
            ((CrossOperator)initialSource.map(new IdentityMapper()).cross((DataSet)initialSource).withParameters(conf)).output((OutputFormat)new DiscardingOutputFormat());
            p = env.createProgramPlan();
            op = this.compileNoStats((Plan)p);
            sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            cross = (DualInputPlanNode)sink.getInput().getSource();
            mapper = (SingleInputPlanNode)cross.getInput1().getSource();
            Assert.assertEquals((Object)TempMode.NONE, (Object)mapper.getInput().getTempMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)cross.getInput2().getTempMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)mapper.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)cross.getInput2().getDataExchangeMode());
            env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(64);
            initialSource = env.generateSequence(1L, 10L);
            conf = new Configuration();
            conf.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST");
            ((CrossOperator)initialSource.map(new IdentityMapper()).cross((DataSet)initialSource).withParameters(conf)).output((OutputFormat)new DiscardingOutputFormat());
            p = env.createProgramPlan();
            op = this.compileNoStats((Plan)p);
            sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            cross = (DualInputPlanNode)sink.getInput().getSource();
            mapper = (SingleInputPlanNode)cross.getInput1().getSource();
            Assert.assertEquals((Object)TempMode.NONE, (Object)mapper.getInput().getTempMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)cross.getInput2().getTempMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)mapper.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)cross.getInput2().getDataExchangeMode());
            env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(64);
            initialSource = env.generateSequence(1L, 10L);
            conf = new Configuration();
            conf.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND");
            ((CrossOperator)initialSource.map(new IdentityMapper()).cross((DataSet)initialSource).withParameters(conf)).output((OutputFormat)new DiscardingOutputFormat());
            p = env.createProgramPlan();
            op = this.compileNoStats((Plan)p);
            sink = (SinkPlanNode)op.getDataSinks().iterator().next();
            cross = (DualInputPlanNode)sink.getInput().getSource();
            mapper = (SingleInputPlanNode)cross.getInput1().getSource();
            Assert.assertEquals((Object)TempMode.NONE, (Object)mapper.getInput().getTempMode());
            Assert.assertEquals((Object)TempMode.NONE, (Object)cross.getInput2().getTempMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)mapper.getInput().getDataExchangeMode());
            Assert.assertEquals((Object)DataExchangeMode.BATCH, (Object)cross.getInput2().getDataExchangeMode());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

