/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Visitor;
import org.junit.Assert;
import org.junit.Test;

public class UnionPropertyPropagationTest
extends CompilerTestBase {
    @Test
    public void testUnion1() {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataSource sourceA = env.generateSequence(0L, 1L);
        DataSource sourceB = env.generateSequence(0L, 1L);
        GroupReduceOperator redA = sourceA.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer());
        GroupReduceOperator redB = sourceB.groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer());
        redA.union((DataSet)redB).groupBy(new String[]{"*"}).reduceGroup(new IdentityGroupReducer()).output((OutputFormat)new DiscardingOutputFormat());
        JavaPlan plan = env.createProgramPlan();
        OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
        JobGraphGenerator jobGen = new JobGraphGenerator();
        jobGen.compileJobGraph(oPlan);
        oPlan.accept((Visitor)new Visitor<PlanNode>(){

            public boolean preVisit(PlanNode visitable) {
                if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase) {
                    for (Channel inConn : visitable.getInputs()) {
                        Assert.assertTrue((String)"Reduce should just forward the input if it is already partitioned", (inConn.getShipStrategy() == ShipStrategyType.FORWARD ? 1 : 0) != 0);
                    }
                    return false;
                }
                return true;
            }

            public void postVisit(PlanNode visitable) {
            }
        });
    }

    @Test
    public void testUnion2() {
        int NUM_INPUTS = 4;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource source = env.readTextFile(IN_FILE);
        FlatMapOperator lastUnion = source.flatMap((FlatMapFunction)new DummyFlatMap());
        for (int i = 1; i < 4; ++i) {
            lastUnion = lastUnion.union((DataSet)source.flatMap((FlatMapFunction)new DummyFlatMap()));
        }
        AggregateOperator result = lastUnion.groupBy(new int[]{0}).aggregate(Aggregations.SUM, 1);
        result.writeAsText(OUT_FILE);
        JavaPlan plan = env.createProgramPlan("Test union on new java-api");
        OptimizedPlan oPlan = this.compileNoStats((Plan)plan);
        JobGraphGenerator jobGen = new JobGraphGenerator();
        jobGen.compileJobGraph(oPlan);
        oPlan.accept((Visitor)new Visitor<PlanNode>(){

            public boolean preVisit(PlanNode visitable) {
                if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase) {
                    Channel inConn = ((SingleInputPlanNode)visitable).getInput();
                    Assert.assertTrue((String)"Union should just forward the Partitioning", (inConn.getShipStrategy() == ShipStrategyType.FORWARD ? 1 : 0) != 0);
                    Assert.assertTrue((String)"Union Node should be under Group operator", (boolean)(inConn.getSource() instanceof NAryUnionPlanNode));
                }
                if (visitable instanceof NAryUnionPlanNode) {
                    int numberInputs = 0;
                    for (Channel inConn : visitable.getInputs()) {
                        PlanNode inNode = inConn.getSource();
                        Assert.assertTrue((String)"Input of Union should be FlatMapOperators", (boolean)(inNode.getProgramOperator() instanceof FlatMapOperatorBase));
                        Assert.assertTrue((String)"Shipment strategy under union should partition the data", (inConn.getShipStrategy() == ShipStrategyType.PARTITION_HASH ? 1 : 0) != 0);
                        ++numberInputs;
                    }
                    Assert.assertTrue((String)"NAryUnion should have 4 inputs", (numberInputs == 4 ? 1 : 0) != 0);
                    return false;
                }
                return true;
            }

            public void postVisit(PlanNode visitable) {
            }
        });
    }

    public static final class DummyFlatMap
    extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            out.collect((Object)new Tuple2((Object)value, (Object)0));
        }
    }
}

