package org.apache.flink.optimizer;

import java.util.HashSet;
import java.util.Iterator;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper;
import org.apache.flink.optimizer.testfunctions.IdentityCrosser;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/optimizer/BranchingPlansCompilerTest.class */
public class BranchingPlansCompilerTest extends CompilerTestBase {

    /* loaded from: input_file:org/apache/flink/optimizer/BranchingPlansCompilerTest$Duplicator.class */
    private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
        private Duplicator() {
        }

        public Tuple2<T, T> map(T t) {
            return new Tuple2<>(t, t);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: map, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m0map(Object obj) throws Exception {
            return map((Duplicator<T>) obj);
        }
    }

    @Test
    public void testCostComputationWithMultipleDataSinks() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(8);
            DataSource generateSequence = executionEnvironment.generateSequence(1L, 10000L);
            MapOperator map = generateSequence.map(new IdentityMapper());
            MapOperator map2 = generateSequence.map(new IdentityMapper());
            for (int i = 0; i < 5; i++) {
                map.output(new DiscardingOutputFormat());
                map2.output(new DiscardingOutputFormat());
            }
            new JobGraphGenerator().compileJobGraph(compileNoStats(executionEnvironment.createProgramPlan("Plans With Multiple Data Sinks")));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchingWithMultipleDataSinks2() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(8);
            MapOperator map = executionEnvironment.generateSequence(1L, 10000L).map(new IdentityMapper());
            MapOperator map2 = map.map(new IdentityMapper());
            MapOperator map3 = map.map(new IdentityMapper());
            map2.output(new DiscardingOutputFormat());
            map3.output(new DiscardingOutputFormat());
            map3.output(new DiscardingOutputFormat());
            JavaPlan createProgramPlan = executionEnvironment.createProgramPlan();
            HashSet hashSet = new HashSet(createProgramPlan.getDataSinks());
            OptimizedPlan compileNoStats = compileNoStats(createProgramPlan);
            Assert.assertEquals("Wrong number of data sinks.", 3L, compileNoStats.getDataSinks().size());
            Iterator it = compileNoStats.getDataSinks().iterator();
            while (it.hasNext()) {
                Assert.assertTrue(hashSet.remove(((SinkPlanNode) it.next()).getProgramOperator()));
            }
            Assert.assertTrue(hashSet.isEmpty());
            new JobGraphGenerator().compileJobGraph(compileNoStats);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchingSourceMultipleTimes() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(8);
            MapOperator map = executionEnvironment.generateSequence(1L, 10000000L).map(new Duplicator());
            JoinOperator.EquiJoin with = map.join(map.join(map.join(map.join(map.join(map).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction());
            MapOperator map2 = map.map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.BranchingPlansCompilerTest.1
                public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple2) {
                    return null;
                }
            });
            with.coGroup(map2.join(map2.join(map2.join(map2.join(map2.join(map2).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyFlatJoinFunction())).where(new int[]{1}).equalTo(new int[]{1}).with(new DummyCoGroupFunction()).output(new DiscardingOutputFormat());
            new JobGraphGenerator().compileJobGraph(compileNoStats(executionEnvironment.createProgramPlan()));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchingWithMultipleDataSinks() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(8);
            MapOperator map = executionEnvironment.generateSequence(1L, 10000000L).map(new Duplicator());
            MapOperator map2 = executionEnvironment.generateSequence(1L, 10000000L).map(new Duplicator());
            MapOperator map3 = executionEnvironment.generateSequence(1L, 10000000L).map(new Duplicator());
            MapOperator map4 = map.coGroup(map2).where(new int[]{0}).equalTo(new int[]{1}).with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { // from class: org.apache.flink.optimizer.BranchingPlansCompilerTest.2
                public void coGroup(Iterable<Tuple2<Long, Long>> iterable, Iterable<Tuple2<Long, Long>> iterable2, Collector<Tuple2<Long, Long>> collector) {
                }
            }).map(new IdentityMapper());
            JoinOperator.EquiJoin with = map4.join(map2.join(map3).where(new int[]{0}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction())).where(new int[]{1}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction());
            map4.groupBy(new int[]{1}).reduceGroup(new Top1GroupReducer()).cross(with).output(new DiscardingOutputFormat());
            with.output(new DiscardingOutputFormat());
            with.output(new DiscardingOutputFormat());
            new JobGraphGenerator().compileJobGraph(compileNoStats(executionEnvironment.createProgramPlan()));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchEachContractType() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(8);
            DataSource generateSequence = executionEnvironment.generateSequence(0L, 1L);
            DataSource generateSequence2 = executionEnvironment.generateSequence(0L, 1L);
            DataSource generateSequence3 = executionEnvironment.generateSequence(0L, 1L);
            Operator name = generateSequence.map(new IdentityMapper()).name("Map 1");
            Operator name2 = name.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).name("Reduce 1");
            Operator name3 = generateSequence2.union(generateSequence2).union(generateSequence3).join(generateSequence3).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityJoiner()).name("Join 1");
            Operator name4 = generateSequence.coGroup(generateSequence2).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 1");
            Operator name5 = name2.cross(name4).with(new IdentityCrosser()).name("Cross 1");
            Operator name6 = name5.coGroup(name5).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 2");
            Operator name7 = name.coGroup(name3).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 3");
            Operator name8 = name7.map(new IdentityMapper()).name("Map 2").coGroup(name3).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 4");
            name6.coGroup(name4).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 5").coGroup(name2.coGroup(name8).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 6")).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityCoGrouper()).name("CoGroup 7").union(generateSequence).union(name7).union(name8).union(name4).output(new DiscardingOutputFormat());
            new JobGraphGenerator().compileJobGraph(compileNoStats(executionEnvironment.createProgramPlan()));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchingUnion() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(8);
            Operator name = executionEnvironment.generateSequence(0L, 1L).join(executionEnvironment.generateSequence(0L, 1L)).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityJoiner()).name("Join 1");
            Operator name2 = name.map(new IdentityMapper()).name("Map 1").groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).name("Reduce 1");
            Operator name3 = name.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).name("Reduce 2");
            Operator name4 = name.map(new IdentityMapper()).name("Map 2");
            name2.union(name3).union(name4).union(name4.map(new IdentityMapper()).name("Map 3")).join(name4, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new String[]{"*"}).equalTo(new String[]{"*"}).with(new IdentityJoiner()).name("Join 2").output(new DiscardingOutputFormat());
            new JobGraphGenerator().compileJobGraph(compileNoStats(executionEnvironment.createProgramPlan()));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchingWithMultipleDataSinksSmall() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(8);
            DataSource generateSequence = executionEnvironment.generateSequence(0L, 1L);
            generateSequence.writeAsText("/tmp/out1");
            generateSequence.writeAsText("/tmp/out2");
            OptimizedPlan compileNoStats = compileNoStats(executionEnvironment.createProgramPlan());
            Assert.assertEquals("Wrong number of data sinks.", 2L, compileNoStats.getDataSinks().size());
            HashSet hashSet = new HashSet();
            hashSet.add("/tmp/out1");
            hashSet.add("/tmp/out2");
            Iterator it = compileNoStats.getDataSinks().iterator();
            while (it.hasNext()) {
                Assert.assertTrue("Invalid data sink.", hashSet.remove(((TextOutputFormat) ((SinkPlanNode) it.next()).getSinkNode().getOperator().getFormatWrapper().getUserCodeObject()).getOutputFilePath().toString()));
            }
            new JobGraphGenerator().compileJobGraph(compileNoStats);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchingDisjointPlan() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        DataSource generateSequence = executionEnvironment.generateSequence(0L, 1L);
        DataSource generateSequence2 = executionEnvironment.generateSequence(0L, 1L);
        generateSequence.writeAsText("file:///test/1");
        generateSequence2.writeAsText("file:///test/2");
        generateSequence.writeAsText("file:///test/3");
        generateSequence2.writeAsText("file:///test/4");
        compileNoStats(executionEnvironment.createProgramPlan());
    }

    @Test
    public void testBranchAfterIteration() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        IterativeDataSet iterate = executionEnvironment.generateSequence(0L, 1L).iterate(10);
        DataSet closeWith = iterate.closeWith(iterate.map(new IdentityMapper()).name("Mapper"));
        closeWith.output(new DiscardingOutputFormat());
        closeWith.map(new IdentityMapper()).output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchBeforeIteration() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        DataSource generateSequence = executionEnvironment.generateSequence(0L, 1L);
        IterativeDataSet name = executionEnvironment.generateSequence(0L, 1L).iterate(10).name("Loop");
        generateSequence.map(new IdentityMapper()).withBroadcastSet(name.closeWith(generateSequence.map(new IdentityMapper()).withBroadcastSet(name, "BC").name("In-Loop Mapper")), "BC").name("Post-Loop Mapper").output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testClosure() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        DataSource generateSequence = executionEnvironment.generateSequence(0L, 1L);
        DataSource generateSequence2 = executionEnvironment.generateSequence(0L, 1L);
        generateSequence.output(new DiscardingOutputFormat());
        generateSequence2.output(new DiscardingOutputFormat());
        IterativeDataSet name = generateSequence.iterate(10).name("Loop");
        name.closeWith(name.cross(generateSequence2).with(new IdentityCrosser())).output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testClosureDeltaIteration() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        MapOperator map = executionEnvironment.generateSequence(0L, 1L).map(new Duplicator());
        MapOperator map2 = executionEnvironment.generateSequence(0L, 1L).map(new Duplicator());
        MapOperator map3 = executionEnvironment.generateSequence(0L, 1L).map(new Duplicator());
        map.output(new DiscardingOutputFormat());
        map3.output(new DiscardingOutputFormat());
        DeltaIteration iterateDelta = map.iterateDelta(map2, 10, new int[]{0});
        Operator name = iterateDelta.getWorkset().cross(map2).with(new IdentityCrosser()).name("Next work set");
        iterateDelta.closeWith(name.join(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).name("Solution set delta"), name).output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testDeltaIterationWithStaticInput() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(8);
        MapOperator map = executionEnvironment.generateSequence(0L, 1L).map(new Duplicator());
        MapOperator map2 = map.map(new IdentityMapper());
        GroupReduceOperator reduceGroup = map.reduceGroup(new IdentityGroupReducer());
        DeltaIteration iterateDelta = map.iterateDelta(map2, 10, new int[]{0});
        Operator name = iterateDelta.getWorkset().join(reduceGroup).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).name("Next work set");
        iterateDelta.closeWith(iterateDelta.getSolutionSet().join(name).where(new int[]{0}).equalTo(new int[]{0}).with(new IdentityJoiner()).name("Solution set delta"), name).output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testIterationWithStaticInput() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(100);
            DataSource generateSequence = executionEnvironment.generateSequence(1L, 1000000L);
            MapOperator map = generateSequence.map(new IdentityMapper());
            ReduceOperator reduce = generateSequence.groupBy(new IdentityKeyExtractor()).reduce(new SelectOneReducer());
            IterativeDataSet iterate = map.iterate(10);
            iterate.closeWith(iterate.join(reduce).where(new IdentityKeyExtractor()).equalTo(new IdentityKeyExtractor()).with(new DummyFlatJoinFunction())).output(new DiscardingOutputFormat());
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchingBroadcastVariable() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(100);
        Operator name = executionEnvironment.readTextFile(IN_FILE).name("source1");
        Operator name2 = executionEnvironment.readTextFile(IN_FILE).name("source2");
        Operator name3 = executionEnvironment.readTextFile(IN_FILE).name("source3");
        SingleInputUdfOperator withBroadcastSet = name.map(new IdentityMapper()).reduceGroup(new Top1GroupReducer()).withBroadcastSet(name3, "bc");
        withBroadcastSet.join(name2.map(new IdentityMapper()).reduceGroup(new Top1GroupReducer()).withBroadcastSet(name3, "bc")).where(new IdentityKeyExtractor()).equalTo(new IdentityKeyExtractor()).with(new RichJoinFunction<String, String, String>() { // from class: org.apache.flink.optimizer.BranchingPlansCompilerTest.3
            public String join(String str, String str2) {
                return null;
            }
        }).withBroadcastSet(name3, "bc1").withBroadcastSet(name, "bc2").withBroadcastSet(withBroadcastSet, "bc3").output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBCVariableClosure() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        Operator name = executionEnvironment.readTextFile(IN_FILE).name("source1");
        GroupReduceOperator reduceGroup = name.map(new IdentityMapper()).reduceGroup(new Top1GroupReducer());
        IterativeDataSet iterate = name.map(new IdentityMapper()).withBroadcastSet(reduceGroup, "bc").iterate(100);
        iterate.closeWith(iterate.map(new IdentityMapper()).withBroadcastSet(reduceGroup, "red")).output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultipleIterations() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(100);
        Operator name = executionEnvironment.readTextFile(IN_FILE).name("source1");
        GroupReduceOperator reduceGroup = name.map(new IdentityMapper()).reduceGroup(new Top1GroupReducer());
        IterativeDataSet iterate = name.iterate(100);
        IterativeDataSet iterate2 = name.iterate(20);
        IterativeDataSet iterate3 = name.iterate(17);
        iterate.closeWith(iterate.map(new IdentityMapper()).withBroadcastSet(reduceGroup, "bc1")).output(new DiscardingOutputFormat());
        iterate2.closeWith(iterate2.reduceGroup(new Top1GroupReducer()).withBroadcastSet(reduceGroup, "bc2")).output(new DiscardingOutputFormat());
        iterate3.closeWith(iterate3.reduceGroup(new IdentityGroupReducer()).withBroadcastSet(reduceGroup, "bc3")).output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultipleIterationsWithClosueBCVars() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(100);
        Operator name = executionEnvironment.readTextFile(IN_FILE).name("source1");
        IterativeDataSet iterate = name.iterate(100);
        IterativeDataSet iterate2 = name.iterate(20);
        IterativeDataSet iterate3 = name.iterate(17);
        iterate.closeWith(iterate.map(new IdentityMapper())).output(new DiscardingOutputFormat());
        iterate2.closeWith(iterate2.reduceGroup(new Top1GroupReducer())).output(new DiscardingOutputFormat());
        iterate3.closeWith(iterate3.reduceGroup(new IdentityGroupReducer())).output(new DiscardingOutputFormat());
        try {
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchesOnlyInBCVariables1() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(100);
            DataSource generateSequence = executionEnvironment.generateSequence(1L, 10L);
            DataSource generateSequence2 = executionEnvironment.generateSequence(1L, 10L);
            generateSequence.map(new IdentityMapper()).withBroadcastSet(generateSequence2, "name1").map(new IdentityMapper()).withBroadcastSet(generateSequence2, "name2").output(new DiscardingOutputFormat());
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testBranchesOnlyInBCVariables2() {
        try {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setParallelism(100);
            Operator name = executionEnvironment.generateSequence(1L, 10L).map(new Duplicator()).name("proper input");
            Operator name2 = executionEnvironment.generateSequence(1L, 10L).name("BC input 1");
            Operator name3 = executionEnvironment.generateSequence(1L, 10L).name("BC input 1");
            name.map(new IdentityMapper()).withBroadcastSet(name2, "bc1").union(name.map(new IdentityMapper()).withBroadcastSet(name2.map(new IdentityMapper()), "bc1").withBroadcastSet(name3, "bc2").join(name.map(new IdentityMapper()).withBroadcastSet(name2, "bc1").withBroadcastSet(name3, "bc2"), JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST).where(new int[]{0}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction())).output(new DiscardingOutputFormat());
            compileNoStats(executionEnvironment.createProgramPlan());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
