/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.junit.Assert;

public class IterationTerminationWithTwoTails
extends RecordAPITestBase {
    private static final String INPUT = "1\n2\n3\n4\n5\n";
    private static final String EXPECTED = "22\n";
    protected String dataPath;
    protected String resultPath;

    public IterationTerminationWithTwoTails() {
        this.setTaskManagerNumSlots(4);
    }

    protected void preSubmit() throws Exception {
        this.dataPath = this.createTempFile("datapoints.txt", INPUT);
        this.resultPath = this.getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        IterationTerminationWithTwoTails.compareResultsByLinesInMemory((String)EXPECTED, (String)this.resultPath);
    }

    protected Plan getTestJob() {
        return IterationTerminationWithTwoTails.getTestPlanPlan(4, this.dataPath, this.resultPath);
    }

    private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
        FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
        BulkIteration iteration = new BulkIteration("Loop");
        iteration.setInput((Operator)initialInput);
        iteration.setMaximumNumberOfIterations(5);
        Assert.assertTrue((iteration.getMaximumNumberOfIterations() > 1 ? 1 : 0) != 0);
        ReduceOperator sumReduce = ReduceOperator.builder((ReduceFunction)new SumReducer()).input(iteration.getPartialSolution()).name("Compute sum (Reduce)").build();
        iteration.setNextPartialSolution((Operator)sumReduce);
        MapOperator terminationMapper = MapOperator.builder((MapFunction)new TerminationMapper()).input(iteration.getPartialSolution()).name("Compute termination criterion (Map)").build();
        iteration.setTerminationCriterion((Operator)terminationMapper);
        FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, (Operator)iteration, "Output");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)finalResult).recordDelimiter('\n')).fieldDelimiter(' ')).field(StringValue.class, 0);
        Plan plan = new Plan((GenericDataSinkBase)finalResult, "Iteration with AllReducer (keyless Reducer)");
        plan.setDefaultParallelism(4);
        Assert.assertTrue((plan.getDefaultParallelism() > 1 ? 1 : 0) != 0);
        return plan;
    }

    public static class TerminationMapper
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void map(Record record, Collector<Record> collector) {
            int currentSum = Integer.parseInt(((StringValue)record.getField(0, StringValue.class)).getValue());
            if (currentSum < 21) {
                collector.collect((Object)record);
            }
        }
    }

    static final class SumReducer
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        SumReducer() {
        }

        public void reduce(Iterator<Record> it, Collector<Record> out) {
            int sum = 0;
            while (it.hasNext()) {
                sum += Integer.parseInt(((StringValue)it.next().getField(0, StringValue.class)).getValue()) + 1;
            }
            out.collect((Object)new Record((Value)new StringValue((CharSequence)Integer.toString(sum))));
        }
    }
}

