/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.compiler.examples;

import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.distributions.SimpleDistribution;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.test.recordJobs.wordcount.WordCount;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Key;
import org.apache.flink.types.StringValue;
import org.junit.Assert;
import org.junit.Test;

public class WordCountCompilerTest
extends CompilerTestBase {
    @Test
    public void testWordCount() {
        this.checkWordCount(true);
        this.checkWordCount(false);
    }

    private void checkWordCount(boolean estimates) {
        try {
            OptimizedPlan plan;
            WordCount wc = new WordCount();
            ExecutionConfig ec = new ExecutionConfig();
            Plan p = wc.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE);
            p.setExecutionConfig(ec);
            if (estimates) {
                FileDataSource source = (FileDataSource)WordCountCompilerTest.getContractResolver((Plan)p).getNode("Input Lines");
                this.setSourceStatistics((GenericDataSourceBase)source, 0x10000000000L, 24.0f);
                plan = this.compileWithStats(p);
            } else {
                plan = this.compileNoStats(p);
            }
            CompilerTestBase.OptimizerPlanNodeResolver resolver = WordCountCompilerTest.getOptimizerPlanNodeResolver((OptimizedPlan)plan);
            SinkPlanNode sink = (SinkPlanNode)resolver.getNode("Word Counts");
            SingleInputPlanNode reducer = (SingleInputPlanNode)resolver.getNode("Count Words");
            SingleInputPlanNode mapper = (SingleInputPlanNode)resolver.getNode("Tokenize Lines");
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)mapper.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)reducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
            Channel c = reducer.getInput();
            Assert.assertEquals((Object)LocalStrategy.COMBININGSORT, (Object)c.getLocalStrategy());
            FieldList l = new FieldList(0);
            Assert.assertEquals((Object)l, (Object)c.getShipStrategyKeys());
            Assert.assertEquals((Object)l, (Object)c.getLocalStrategyKeys());
            Assert.assertTrue((boolean)Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
            SingleInputPlanNode combiner = (SingleInputPlanNode)reducer.getPredecessor();
            Assert.assertEquals((Object)DriverStrategy.SORTED_GROUP_COMBINE, (Object)combiner.getDriverStrategy());
            Assert.assertEquals((Object)l, (Object)combiner.getKeys(0));
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)combiner.getInput().getShipStrategy());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testWordCountWithSortedSink() {
        this.checkWordCountWithSortedSink(true);
        this.checkWordCountWithSortedSink(false);
    }

    private void checkWordCountWithSortedSink(boolean estimates) {
        try {
            OptimizedPlan plan;
            FileDataSource sourceNode = new FileDataSource((FileInputFormat)new TextInputFormat(), IN_FILE, "Input Lines");
            MapOperator mapNode = MapOperator.builder((MapFunction)new WordCount.TokenizeLine()).input((Operator)sourceNode).name("Tokenize Lines").build();
            ReduceOperator reduceNode = ReduceOperator.builder((ReduceFunction)new WordCount.CountWords(), StringValue.class, (int)0).input((Operator)mapNode).name("Count Words").build();
            FileDataSink out = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), OUT_FILE, (Operator)reduceNode, "Word Counts");
            ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)out).recordDelimiter('\n')).fieldDelimiter(' ')).lenient(true)).field(StringValue.class, 0)).field(IntValue.class, 1);
            Ordering ordering = new Ordering(0, StringValue.class, Order.DESCENDING);
            out.setGlobalOrder(ordering, (DataDistribution)new SimpleDistribution((Key[])new StringValue[]{new StringValue((CharSequence)"N")}));
            ExecutionConfig ec = new ExecutionConfig();
            Plan p = new Plan((GenericDataSinkBase)out, "WordCount Example");
            p.setDefaultParallelism(8);
            p.setExecutionConfig(ec);
            if (estimates) {
                this.setSourceStatistics((GenericDataSourceBase)sourceNode, 0x10000000000L, 24.0f);
                plan = this.compileWithStats(p);
            } else {
                plan = this.compileNoStats(p);
            }
            CompilerTestBase.OptimizerPlanNodeResolver resolver = WordCountCompilerTest.getOptimizerPlanNodeResolver((OptimizedPlan)plan);
            SinkPlanNode sink = (SinkPlanNode)resolver.getNode("Word Counts");
            SingleInputPlanNode reducer = (SingleInputPlanNode)resolver.getNode("Count Words");
            SingleInputPlanNode mapper = (SingleInputPlanNode)resolver.getNode("Tokenize Lines");
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)mapper.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_RANGE, (Object)reducer.getInput().getShipStrategy());
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
            Channel c = reducer.getInput();
            Assert.assertEquals((Object)LocalStrategy.COMBININGSORT, (Object)c.getLocalStrategy());
            FieldList l = new FieldList(0);
            Assert.assertEquals((Object)l, (Object)c.getShipStrategyKeys());
            Assert.assertEquals((Object)l, (Object)c.getLocalStrategyKeys());
            Assert.assertFalse((boolean)c.getShipStrategySortOrder()[0]);
            Assert.assertFalse((boolean)c.getLocalStrategySortOrder()[0]);
            SingleInputPlanNode combiner = (SingleInputPlanNode)reducer.getPredecessor();
            Assert.assertEquals((Object)DriverStrategy.SORTED_GROUP_COMBINE, (Object)combiner.getDriverStrategy());
            Assert.assertEquals((Object)l, (Object)combiner.getKeys(0));
            Assert.assertEquals((Object)l, (Object)combiner.getKeys(1));
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)combiner.getInput().getShipStrategy());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

