/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import java.util.HashSet;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper;
import org.apache.flink.optimizer.testfunctions.IdentityCrosser;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class BranchingPlansCompilerTest
extends CompilerTestBase {
    @Test
    public void testCostComputationWithMultipleDataSinks() {
        int SINKS = 5;
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            DataSource source = env.generateSequence(1L, 10000L);
            MapOperator mappedA = source.map(new IdentityMapper());
            MapOperator mappedC = source.map(new IdentityMapper());
            for (int sink = 0; sink < 5; ++sink) {
                mappedA.output((OutputFormat)new DiscardingOutputFormat());
                mappedC.output((OutputFormat)new DiscardingOutputFormat());
            }
            JavaPlan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
            OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
            new JobGraphGenerator().compileJobGraph(oPlan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchingWithMultipleDataSinks2() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            DataSource source = env.generateSequence(1L, 10000L);
            MapOperator mappedA = source.map(new IdentityMapper());
            MapOperator mappedB = mappedA.map(new IdentityMapper());
            MapOperator mappedC = mappedA.map(new IdentityMapper());
            mappedB.output((OutputFormat)new DiscardingOutputFormat());
            mappedC.output((OutputFormat)new DiscardingOutputFormat());
            mappedC.output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan plan = env.createProgramPlan();
            HashSet sinks = new HashSet(plan.getDataSinks());
            OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
            Assert.assertEquals((String)"Wrong number of data sinks.", (long)3L, (long)oPlan.getDataSinks().size());
            for (SinkPlanNode sink : oPlan.getDataSinks()) {
                Assert.assertTrue((boolean)sinks.remove(sink.getProgramOperator()));
            }
            Assert.assertTrue((boolean)sinks.isEmpty());
            new JobGraphGenerator().compileJobGraph(oPlan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchingSourceMultipleTimes() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            MapOperator source = env.generateSequence(1L, 10000000L).map(new Duplicator());
            JoinOperator.EquiJoin joined1 = source.join((DataSet)source).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined2 = source.join((DataSet)joined1).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined3 = source.join((DataSet)joined2).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined4 = source.join((DataSet)joined3).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined5 = source.join((DataSet)joined4).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            MapOperator mapped = source.map((MapFunction)new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>(){

                public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
                    return null;
                }
            });
            JoinOperator.EquiJoin joined6 = mapped.join((DataSet)mapped).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined7 = mapped.join((DataSet)joined6).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined8 = mapped.join((DataSet)joined7).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined9 = mapped.join((DataSet)joined8).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined10 = mapped.join((DataSet)joined9).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            joined5.coGroup((DataSet)joined10).where(new int[]{1}).equalTo(new int[]{1}).with(new DummyCoGroupFunction()).output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan plan = env.createProgramPlan();
            OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
            new JobGraphGenerator().compileJobGraph(oPlan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchingWithMultipleDataSinks() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            MapOperator sourceA = env.generateSequence(1L, 10000000L).map(new Duplicator());
            MapOperator sourceB = env.generateSequence(1L, 10000000L).map(new Duplicator());
            MapOperator sourceC = env.generateSequence(1L, 10000000L).map(new Duplicator());
            MapOperator mapped = sourceA.coGroup((DataSet)sourceB).where(new int[]{0}).equalTo(new int[]{1}).with((CoGroupFunction)new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>(){

                public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple2<Long, Long>> second, Collector<Tuple2<Long, Long>> out) {
                }
            }).map(new IdentityMapper());
            JoinOperator.EquiJoin joined = sourceB.join((DataSet)sourceC).where(new int[]{0}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction());
            JoinOperator.EquiJoin joined2 = mapped.join((DataSet)joined).where(new int[]{1}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction());
            GroupReduceOperator reduced = mapped.groupBy(new int[]{1}).reduceGroup(new Top1GroupReducer());
            reduced.cross((DataSet)joined2).output((OutputFormat)new DiscardingOutputFormat());
            joined2.output((OutputFormat)new DiscardingOutputFormat());
            joined2.output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan plan = env.createProgramPlan();
            OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
            new JobGraphGenerator().compileJobGraph(oPlan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchEachContractType() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            DataSource sourceA = env.generateSequence(0L, 1L);
            DataSource sourceB = env.generateSequence(0L, 1L);
            DataSource sourceC = env.generateSequence(0L, 1L);
            Operator map1 = sourceA.map(new IdentityMapper()).name("Map 1");
            Operator reduce1 = map1.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).name("Reduce 1");
            Operator join1 = sourceB.union((DataSet)sourceB).union((DataSet)sourceC).join((DataSet)sourceC).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityJoiner()).name("Join 1");
            Operator coGroup1 = sourceA.coGroup((DataSet)sourceB).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 1");
            Operator cross1 = reduce1.cross((DataSet)coGroup1).with(new IdentityCrosser()).name("Cross 1");
            Operator coGroup2 = cross1.coGroup((DataSet)cross1).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 2");
            Operator coGroup3 = map1.coGroup((DataSet)join1).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 3");
            Operator map2 = coGroup3.map(new IdentityMapper()).name("Map 2");
            Operator coGroup4 = map2.coGroup((DataSet)join1).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 4");
            Operator coGroup5 = coGroup2.coGroup((DataSet)coGroup1).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 5");
            Operator coGroup6 = reduce1.coGroup((DataSet)coGroup4).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 6");
            Operator coGroup7 = coGroup5.coGroup((DataSet)coGroup6).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 7");
            coGroup7.union((DataSet)sourceA).union((DataSet)coGroup3).union((DataSet)coGroup4).union((DataSet)coGroup1).output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan plan = env.createProgramPlan();
            OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
            JobGraphGenerator jobGen = new JobGraphGenerator();
            jobGen.compileJobGraph(oPlan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchingUnion() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            DataSource source1 = env.generateSequence(0L, 1L);
            DataSource source2 = env.generateSequence(0L, 1L);
            Operator join1 = source1.join((DataSet)source2).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityJoiner()).name("Join 1");
            Operator map1 = join1.map(new IdentityMapper()).name("Map 1");
            Operator reduce1 = map1.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).name("Reduce 1");
            Operator reduce2 = join1.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).name("Reduce 2");
            Operator map2 = join1.map(new IdentityMapper()).name("Map 2");
            Operator map3 = map2.map(new IdentityMapper()).name("Map 3");
            Operator join2 = reduce1.union((DataSet)reduce2).union((DataSet)map2).union((DataSet)map3).join((DataSet)map2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityJoiner()).name("Join 2");
            join2.output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan plan = env.createProgramPlan();
            OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
            JobGraphGenerator jobGen = new JobGraphGenerator();
            jobGen.compileJobGraph(oPlan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchingWithMultipleDataSinksSmall() {
        try {
            String outPath1 = "/tmp/out1";
            String outPath2 = "/tmp/out2";
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            DataSource source1 = env.generateSequence(0L, 1L);
            source1.writeAsText(outPath1);
            source1.writeAsText(outPath2);
            JavaPlan plan = env.createProgramPlan();
            OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
            Assert.assertEquals((String)"Wrong number of data sinks.", (long)2L, (long)oPlan.getDataSinks().size());
            HashSet<String> allSinks = new HashSet<String>();
            allSinks.add(outPath1);
            allSinks.add(outPath2);
            for (SinkPlanNode n : oPlan.getDataSinks()) {
                String path = ((TextOutputFormat)n.getSinkNode().getOperator().getFormatWrapper().getUserCodeObject()).getOutputFilePath().toString();
                Assert.assertTrue((String)"Invalid data sink.", (boolean)allSinks.remove(path));
            }
            JobGraphGenerator jobGen = new JobGraphGenerator();
            jobGen.compileJobGraph(oPlan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchingDisjointPlan() {
        String out1Path = "file:///test/1";
        String out2Path = "file:///test/2";
        String out3Path = "file:///test/3";
        String out4Path = "file:///test/4";
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource sourceA = env.generateSequence(0L, 1L);
        DataSource sourceB = env.generateSequence(0L, 1L);
        sourceA.writeAsText("file:///test/1");
        sourceB.writeAsText("file:///test/2");
        sourceA.writeAsText("file:///test/3");
        sourceB.writeAsText("file:///test/4");
        JavaPlan plan = env.createProgramPlan();
        this.compileNoStats((Plan)plan);
    }

    @Test
    public void testBranchAfterIteration() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource sourceA = env.generateSequence(0L, 1L);
        IterativeDataSet loopHead = sourceA.iterate(10);
        Operator loopTail = loopHead.map(new IdentityMapper()).name("Mapper");
        DataSet loopRes = loopHead.closeWith((DataSet)loopTail);
        loopRes.output((OutputFormat)new DiscardingOutputFormat());
        loopRes.map(new IdentityMapper()).output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchBeforeIteration() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource source1 = env.generateSequence(0L, 1L);
        DataSource source2 = env.generateSequence(0L, 1L);
        IterativeDataSet loopHead = (IterativeDataSet)source2.iterate(10).name("Loop");
        Operator loopTail = ((MapOperator)source1.map(new IdentityMapper()).withBroadcastSet((DataSet)loopHead, "BC")).name("In-Loop Mapper");
        DataSet loopRes = loopHead.closeWith((DataSet)loopTail);
        Operator map = ((MapOperator)source1.map(new IdentityMapper()).withBroadcastSet(loopRes, "BC")).name("Post-Loop Mapper");
        map.output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testClosure() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource sourceA = env.generateSequence(0L, 1L);
        DataSource sourceB = env.generateSequence(0L, 1L);
        sourceA.output((OutputFormat)new DiscardingOutputFormat());
        sourceB.output((OutputFormat)new DiscardingOutputFormat());
        IterativeDataSet loopHead = (IterativeDataSet)sourceA.iterate(10).name("Loop");
        CrossOperator loopTail = loopHead.cross((DataSet)sourceB).with(new IdentityCrosser());
        DataSet loopRes = loopHead.closeWith((DataSet)loopTail);
        loopRes.output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testClosureDeltaIteration() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        MapOperator sourceA = env.generateSequence(0L, 1L).map(new Duplicator());
        MapOperator sourceB = env.generateSequence(0L, 1L).map(new Duplicator());
        MapOperator sourceC = env.generateSequence(0L, 1L).map(new Duplicator());
        sourceA.output((OutputFormat)new DiscardingOutputFormat());
        sourceC.output((OutputFormat)new DiscardingOutputFormat());
        DeltaIteration loop = sourceA.iterateDelta((DataSet)sourceB, 10, new int[]{0});
        Operator workset = loop.getWorkset().cross((DataSet)sourceB).with(new IdentityCrosser()).name("Next work set");
        Operator delta = workset.join((DataSet)loop.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).name("Solution set delta");
        DataSet result = loop.closeWith((DataSet)delta, (DataSet)workset);
        result.output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDeltaIterationWithStaticInput() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        MapOperator source = env.generateSequence(0L, 1L).map(new Duplicator());
        MapOperator map = source.map(new IdentityMapper());
        GroupReduceOperator reduce = source.reduceGroup(new IdentityGroupReducer());
        DeltaIteration loop = source.iterateDelta((DataSet)map, 10, new int[]{0});
        Operator workset = loop.getWorkset().join((DataSet)reduce).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).name("Next work set");
        Operator delta = loop.getSolutionSet().join((DataSet)workset).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).name("Solution set delta");
        DataSet result = loop.closeWith((DataSet)delta, (DataSet)workset);
        result.output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testIterationWithStaticInput() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(100);
            DataSource source = env.generateSequence(1L, 1000000L);
            MapOperator mapped = source.map(new IdentityMapper());
            ReduceOperator reduced = source.groupBy(new IdentityKeyExtractor()).reduce(new SelectOneReducer());
            IterativeDataSet iteration = mapped.iterate(10);
            iteration.closeWith((DataSet)iteration.join((DataSet)reduced).where(new IdentityKeyExtractor()).equalTo(new IdentityKeyExtractor()).with(new DummyFlatJoinFunction())).output((OutputFormat)new DiscardingOutputFormat());
            this.compileNoStats((Plan)env.createProgramPlan());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchingBroadcastVariable() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(100);
        Operator input1 = env.readTextFile(IN_FILE).name("source1");
        Operator input2 = env.readTextFile(IN_FILE).name("source2");
        Operator input3 = env.readTextFile(IN_FILE).name("source3");
        SingleInputUdfOperator result1 = input1.map(new IdentityMapper()).reduceGroup(new Top1GroupReducer()).withBroadcastSet((DataSet)input3, "bc");
        SingleInputUdfOperator result2 = input2.map(new IdentityMapper()).reduceGroup(new Top1GroupReducer()).withBroadcastSet((DataSet)input3, "bc");
        ((JoinOperator)((JoinOperator)((JoinOperator)result1.join((DataSet)result2).where(new IdentityKeyExtractor()).equalTo(new IdentityKeyExtractor()).with((JoinFunction)new RichJoinFunction<String, String, String>(){

            public String join(String first, String second) {
                return null;
            }
        }).withBroadcastSet((DataSet)input3, "bc1")).withBroadcastSet((DataSet)input1, "bc2")).withBroadcastSet((DataSet)result1, "bc3")).output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBCVariableClosure() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Operator input = env.readTextFile(IN_FILE).name("source1");
        GroupReduceOperator reduced = input.map(new IdentityMapper()).reduceGroup(new Top1GroupReducer());
        SingleInputUdfOperator initialSolution = input.map(new IdentityMapper()).withBroadcastSet((DataSet)reduced, "bc");
        IterativeDataSet iteration = initialSolution.iterate(100);
        iteration.closeWith((DataSet)iteration.map(new IdentityMapper()).withBroadcastSet((DataSet)reduced, "red")).output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultipleIterations() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(100);
        Operator input = env.readTextFile(IN_FILE).name("source1");
        GroupReduceOperator reduced = input.map(new IdentityMapper()).reduceGroup(new Top1GroupReducer());
        IterativeDataSet iteration1 = input.iterate(100);
        IterativeDataSet iteration2 = input.iterate(20);
        IterativeDataSet iteration3 = input.iterate(17);
        iteration1.closeWith((DataSet)iteration1.map(new IdentityMapper()).withBroadcastSet((DataSet)reduced, "bc1")).output((OutputFormat)new DiscardingOutputFormat());
        iteration2.closeWith((DataSet)iteration2.reduceGroup(new Top1GroupReducer()).withBroadcastSet((DataSet)reduced, "bc2")).output((OutputFormat)new DiscardingOutputFormat());
        iteration3.closeWith((DataSet)iteration3.reduceGroup(new IdentityGroupReducer()).withBroadcastSet((DataSet)reduced, "bc3")).output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultipleIterationsWithClosueBCVars() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(100);
        Operator input = env.readTextFile(IN_FILE).name("source1");
        IterativeDataSet iteration1 = input.iterate(100);
        IterativeDataSet iteration2 = input.iterate(20);
        IterativeDataSet iteration3 = input.iterate(17);
        iteration1.closeWith((DataSet)iteration1.map(new IdentityMapper())).output((OutputFormat)new DiscardingOutputFormat());
        iteration2.closeWith((DataSet)iteration2.reduceGroup(new Top1GroupReducer())).output((OutputFormat)new DiscardingOutputFormat());
        iteration3.closeWith((DataSet)iteration3.reduceGroup(new IdentityGroupReducer())).output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        try {
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchesOnlyInBCVariables1() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(100);
            DataSource input = env.generateSequence(1L, 10L);
            DataSource bc_input = env.generateSequence(1L, 10L);
            ((MapOperator)((MapOperator)input.map(new IdentityMapper()).withBroadcastSet((DataSet)bc_input, "name1")).map(new IdentityMapper()).withBroadcastSet((DataSet)bc_input, "name2")).output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan plan = env.createProgramPlan();
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchesOnlyInBCVariables2() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(100);
            Operator input = env.generateSequence(1L, 10L).map(new Duplicator()).name("proper input");
            Operator bc_input1 = env.generateSequence(1L, 10L).name("BC input 1");
            Operator bc_input2 = env.generateSequence(1L, 10L).name("BC input 1");
            SingleInputUdfOperator joinInput1 = ((MapOperator)input.map(new IdentityMapper()).withBroadcastSet((DataSet)bc_input1.map(new IdentityMapper()), "bc1")).withBroadcastSet((DataSet)bc_input2, "bc2");
            SingleInputUdfOperator joinInput2 = ((MapOperator)input.map(new IdentityMapper()).withBroadcastSet((DataSet)bc_input1, "bc1")).withBroadcastSet((DataSet)bc_input2, "bc2");
            JoinOperator.EquiJoin joinResult = joinInput1.join((DataSet)joinInput2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction());
            ((MapOperator)input.map(new IdentityMapper()).withBroadcastSet((DataSet)bc_input1, "bc1")).union((DataSet)joinResult).output((OutputFormat)new DiscardingOutputFormat());
            JavaPlan plan = env.createProgramPlan();
            this.compileNoStats((Plan)plan);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static final class Duplicator<T>
    implements MapFunction<T, Tuple2<T, T>> {
        private Duplicator() {
        }

        public Tuple2<T, T> map(T value) {
            return new Tuple2(value, value);
        }
    }
}

