/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer;

import java.io.Serializable;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.junit.Assert;
import org.junit.Test;

public class DistinctCompilationTest
extends CompilerTestBase
implements Serializable {
    @Test
    public void testDistinctPlain() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator data = ((DataSource)env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class).name("source")).setParallelism(6);
            ((DistinctOperator)data.distinct().name("reducer")).output((OutputFormat)new DiscardingOutputFormat()).name("sink");
            JavaPlan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats((Plan)p);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = DistinctCompilationTest.getOptimizerPlanNodeResolver(op);
            SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
            SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
            SingleInputPlanNode combineNode = (SingleInputPlanNode)reduceNode.getInput().getSource();
            Assert.assertEquals((Object)sourceNode, (Object)combineNode.getInput().getSource());
            Assert.assertEquals((Object)reduceNode, (Object)sinkNode.getInput().getSource());
            Assert.assertEquals((Object)DriverStrategy.SORTED_GROUP_REDUCE, (Object)reduceNode.getDriverStrategy());
            Assert.assertEquals((Object)DriverStrategy.SORTED_GROUP_COMBINE, (Object)combineNode.getDriverStrategy());
            Assert.assertEquals((Object)new FieldList(new int[]{0, 1}), (Object)reduceNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(new int[]{0, 1}), (Object)combineNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(new int[]{0, 1}), (Object)reduceNode.getInput().getLocalStrategyKeys());
            Assert.assertEquals((long)6L, (long)sourceNode.getParallelism());
            Assert.assertEquals((long)6L, (long)combineNode.getParallelism());
            Assert.assertEquals((long)8L, (long)reduceNode.getParallelism());
            Assert.assertEquals((long)8L, (long)sinkNode.getParallelism());
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)(e.getClass().getSimpleName() + " in test: " + e.getMessage()));
        }
    }

    @Test
    public void testDistinctWithSelectorFunctionKey() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator data = ((DataSource)env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class).name("source")).setParallelism(6);
            ((DistinctOperator)data.distinct((KeySelector)new KeySelector<Tuple2<String, Double>, String>(){

                public String getKey(Tuple2<String, Double> value) {
                    return (String)value.f0;
                }
            }).name("reducer")).output((OutputFormat)new DiscardingOutputFormat()).name("sink");
            JavaPlan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats((Plan)p);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = DistinctCompilationTest.getOptimizerPlanNodeResolver(op);
            SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
            SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
            SingleInputPlanNode combineNode = (SingleInputPlanNode)reduceNode.getInput().getSource();
            SingleInputPlanNode keyExtractor = (SingleInputPlanNode)combineNode.getInput().getSource();
            SingleInputPlanNode keyProjector = (SingleInputPlanNode)sinkNode.getInput().getSource();
            Assert.assertEquals((Object)sourceNode, (Object)keyExtractor.getInput().getSource());
            Assert.assertEquals((Object)keyProjector, (Object)sinkNode.getInput().getSource());
            Assert.assertEquals((Object)DriverStrategy.SORTED_GROUP_REDUCE, (Object)reduceNode.getDriverStrategy());
            Assert.assertEquals((Object)DriverStrategy.SORTED_GROUP_COMBINE, (Object)combineNode.getDriverStrategy());
            Assert.assertEquals((Object)new FieldList(0), (Object)reduceNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(0), (Object)combineNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(0), (Object)reduceNode.getInput().getLocalStrategyKeys());
            Assert.assertEquals((long)6L, (long)sourceNode.getParallelism());
            Assert.assertEquals((long)6L, (long)keyExtractor.getParallelism());
            Assert.assertEquals((long)6L, (long)combineNode.getParallelism());
            Assert.assertEquals((long)8L, (long)reduceNode.getParallelism());
            Assert.assertEquals((long)8L, (long)keyProjector.getParallelism());
            Assert.assertEquals((long)8L, (long)sinkNode.getParallelism());
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)(e.getClass().getSimpleName() + " in test: " + e.getMessage()));
        }
    }

    @Test
    public void testDistinctWithFieldPositionKeyCombinable() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(8);
            Operator data = ((DataSource)env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class).name("source")).setParallelism(6);
            DistinctOperator reduced = (DistinctOperator)data.distinct(new int[]{1}).name("reducer");
            reduced.output((OutputFormat)new DiscardingOutputFormat()).name("sink");
            JavaPlan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats((Plan)p);
            CompilerTestBase.OptimizerPlanNodeResolver resolver = DistinctCompilationTest.getOptimizerPlanNodeResolver(op);
            SourcePlanNode sourceNode = (SourcePlanNode)resolver.getNode("source");
            SingleInputPlanNode reduceNode = (SingleInputPlanNode)resolver.getNode("reducer");
            SinkPlanNode sinkNode = (SinkPlanNode)resolver.getNode("sink");
            SingleInputPlanNode combineNode = (SingleInputPlanNode)reduceNode.getInput().getSource();
            Assert.assertEquals((Object)sourceNode, (Object)combineNode.getInput().getSource());
            Assert.assertEquals((Object)reduceNode, (Object)sinkNode.getInput().getSource());
            Assert.assertEquals((Object)DriverStrategy.SORTED_GROUP_REDUCE, (Object)reduceNode.getDriverStrategy());
            Assert.assertEquals((Object)DriverStrategy.SORTED_GROUP_COMBINE, (Object)combineNode.getDriverStrategy());
            Assert.assertEquals((Object)new FieldList(1), (Object)reduceNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(1), (Object)combineNode.getKeys(0));
            Assert.assertEquals((Object)new FieldList(1), (Object)reduceNode.getInput().getLocalStrategyKeys());
            Assert.assertEquals((long)6L, (long)sourceNode.getParallelism());
            Assert.assertEquals((long)6L, (long)combineNode.getParallelism());
            Assert.assertEquals((long)8L, (long)reduceNode.getParallelism());
            Assert.assertEquals((long)8L, (long)sinkNode.getParallelism());
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
            Assert.fail((String)(e.getClass().getSimpleName() + " in test: " + e.getMessage()));
        }
    }
}

