/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.dataexchange;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.dag.OptimizerNode;
import org.apache.flink.optimizer.dag.SingleInputNode;
import org.apache.flink.optimizer.dag.SinkJoiner;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
import org.apache.flink.optimizer.traversals.BranchesVisitor;
import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

public class PipelineBreakingTest {
    @Test
    public void testSimpleForwardPlan() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource dataSet = env.readTextFile("/never/accessed");
            dataSet.map((MapFunction)new MapFunction<String, Integer>(){

                public Integer map(String value) {
                    return 0;
                }
            }).filter((FilterFunction)new FilterFunction<Integer>(){

                public boolean filter(Integer value) {
                    return false;
                }
            }).groupBy(new IdentityKeyExtractor()).reduceGroup(new Top1GroupReducer()).output((OutputFormat)new DiscardingOutputFormat());
            DataSinkNode sinkNode = PipelineBreakingTest.convertPlan((Plan)env.createProgramPlan()).get(0);
            SingleInputNode reduceNode = (SingleInputNode)sinkNode.getPredecessorNode();
            SingleInputNode keyExtractorNode = (SingleInputNode)reduceNode.getPredecessorNode();
            SingleInputNode filterNode = (SingleInputNode)keyExtractorNode.getPredecessorNode();
            SingleInputNode mapNode = (SingleInputNode)filterNode.getPredecessorNode();
            Assert.assertFalse((boolean)sinkNode.getInputConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)reduceNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)keyExtractorNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)filterNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)mapNode.getIncomingConnection().isBreakingPipeline());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testBranchingPlanNotReJoined() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            MapOperator data = env.readTextFile("/never/accessed").map((MapFunction)new MapFunction<String, Integer>(){

                public Integer map(String value) {
                    return 0;
                }
            });
            data.filter((FilterFunction)new FilterFunction<Integer>(){

                public boolean filter(Integer value) {
                    return false;
                }
            }).output((OutputFormat)new DiscardingOutputFormat());
            data.join((DataSet)env.fromElements((Object[])new Integer[]{1, 2, 3, 4})).where(new IdentityKeyExtractor()).equalTo(new IdentityKeyExtractor()).output((OutputFormat)new DiscardingOutputFormat());
            data.output((OutputFormat)new DiscardingOutputFormat());
            List<DataSinkNode> sinks = PipelineBreakingTest.convertPlan((Plan)env.createProgramPlan());
            DataSinkNode sinkAfterFilter = sinks.get(0);
            DataSinkNode sinkAfterJoin = sinks.get(1);
            DataSinkNode sinkDirect = sinks.get(2);
            SingleInputNode filterNode = (SingleInputNode)sinkAfterFilter.getPredecessorNode();
            SingleInputNode mapNode = (SingleInputNode)filterNode.getPredecessorNode();
            TwoInputNode joinNode = (TwoInputNode)sinkAfterJoin.getPredecessorNode();
            SingleInputNode joinInput = (SingleInputNode)joinNode.getSecondPredecessorNode();
            Assert.assertFalse((boolean)sinkAfterFilter.getInputConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)sinkAfterJoin.getInputConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)sinkDirect.getInputConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)filterNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)mapNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)joinNode.getFirstIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)joinNode.getSecondIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)joinInput.getIncomingConnection().isBreakingPipeline());
            Assert.assertEquals((Object)mapNode, (Object)((SingleInputNode)joinNode.getFirstPredecessorNode()).getPredecessorNode());
            Assert.assertEquals((Object)mapNode, (Object)sinkDirect.getPredecessorNode());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testReJoinedBranches() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            MapOperator data = env.fromElements((Object[])new Long[]{33L, 44L}).map((MapFunction)new MapFunction<Long, Tuple2<Long, Long>>(){

                public Tuple2<Long, Long> map(Long value) {
                    return new Tuple2((Object)value, (Object)value);
                }
            });
            ReduceOperator reduced = data.groupBy(new int[]{0}).reduce(new SelectOneReducer());
            reduced.output((OutputFormat)new DiscardingOutputFormat());
            FilterOperator filtered = data.filter((FilterFunction)new FilterFunction<Tuple2<Long, Long>>(){

                public boolean filter(Tuple2<Long, Long> value) throws Exception {
                    return false;
                }
            });
            JoinOperator.EquiJoin joined = reduced.join((DataSet)filtered).where(new int[]{1}).equalTo(new int[]{1}).with(new DummyFlatJoinFunction());
            joined.flatMap(new IdentityFlatMapper()).output((OutputFormat)new DiscardingOutputFormat());
            joined.coGroup((DataSet)filtered.groupBy(new int[]{1}).reduceGroup(new Top1GroupReducer())).where(new int[]{0}).equalTo(new int[]{0}).with(new DummyCoGroupFunction()).output((OutputFormat)new DiscardingOutputFormat());
            List<DataSinkNode> sinks = PipelineBreakingTest.convertPlan((Plan)env.createProgramPlan());
            DataSinkNode sinkAfterReduce = sinks.get(0);
            DataSinkNode sinkAfterFlatMap = sinks.get(1);
            DataSinkNode sinkAfterCoGroup = sinks.get(2);
            SingleInputNode reduceNode = (SingleInputNode)sinkAfterReduce.getPredecessorNode();
            SingleInputNode mapNode = (SingleInputNode)reduceNode.getPredecessorNode();
            SingleInputNode flatMapNode = (SingleInputNode)sinkAfterFlatMap.getPredecessorNode();
            TwoInputNode joinNode = (TwoInputNode)flatMapNode.getPredecessorNode();
            SingleInputNode filterNode = (SingleInputNode)joinNode.getSecondPredecessorNode();
            TwoInputNode coGroupNode = (TwoInputNode)sinkAfterCoGroup.getPredecessorNode();
            SingleInputNode otherReduceNode = (SingleInputNode)coGroupNode.getSecondPredecessorNode();
            Assert.assertEquals((Object)reduceNode, (Object)joinNode.getFirstPredecessorNode());
            Assert.assertEquals((Object)mapNode, (Object)filterNode.getPredecessorNode());
            Assert.assertEquals((Object)joinNode, (Object)coGroupNode.getFirstPredecessorNode());
            Assert.assertEquals((Object)filterNode, (Object)otherReduceNode.getPredecessorNode());
            Assert.assertFalse((boolean)sinkAfterReduce.getInputConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)sinkAfterFlatMap.getInputConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)sinkAfterCoGroup.getInputConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)mapNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)flatMapNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)joinNode.getFirstIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)coGroupNode.getFirstIncomingConnection().isBreakingPipeline());
            Assert.assertFalse((boolean)coGroupNode.getSecondIncomingConnection().isBreakingPipeline());
            Assert.assertTrue((boolean)reduceNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertTrue((boolean)filterNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertTrue((boolean)otherReduceNode.getIncomingConnection().isBreakingPipeline());
            Assert.assertTrue((boolean)joinNode.getSecondIncomingConnection().isBreakingPipeline());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static List<DataSinkNode> convertPlan(Plan p) {
        OptimizerNode rootNode;
        GraphCreatingVisitor dagCreator = new GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode());
        p.accept((Visitor)dagCreator);
        List sinks = dagCreator.getSinks();
        if (sinks.size() == 1) {
            rootNode = (OptimizerNode)sinks.get(0);
        } else {
            Iterator iter = sinks.iterator();
            rootNode = (OptimizerNode)iter.next();
            while (iter.hasNext()) {
                rootNode = new SinkJoiner(rootNode, (OptimizerNode)iter.next());
            }
        }
        rootNode.accept((Visitor)new IdAndEstimatesVisitor(null));
        rootNode.accept((Visitor)new BranchesVisitor());
        return sinks;
    }
}

