/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;
import org.junit.Assert;

public class IterationWithAllReducerITCase
extends RecordAPITestBase {
    private static final String INPUT = "1\n1\n1\n1\n1\n1\n1\n1\n";
    private static final String EXPECTED = "1\n";
    protected String dataPath;
    protected String resultPath;

    public IterationWithAllReducerITCase() {
        this.setTaskManagerNumSlots(4);
    }

    protected void preSubmit() throws Exception {
        this.dataPath = this.createTempFile("datapoints.txt", INPUT);
        this.resultPath = this.getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        IterationWithAllReducerITCase.compareResultsByLinesInMemory((String)EXPECTED, (String)this.resultPath);
    }

    protected Plan getTestJob() {
        Plan plan = IterationWithAllReducerITCase.getTestPlanPlan(4, this.dataPath, this.resultPath);
        return plan;
    }

    private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
        FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
        BulkIteration iteration = new BulkIteration("Loop");
        iteration.setInput((Operator)initialInput);
        iteration.setMaximumNumberOfIterations(5);
        Assert.assertTrue((iteration.getMaximumNumberOfIterations() > 1 ? 1 : 0) != 0);
        ReduceOperator sumReduce = ReduceOperator.builder((ReduceFunction)new PickOneReducer()).input(iteration.getPartialSolution()).name("Compute sum (Reduce)").build();
        iteration.setNextPartialSolution((Operator)sumReduce);
        FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, (Operator)iteration, "Output");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)finalResult).recordDelimiter('\n')).fieldDelimiter(' ')).field(StringValue.class, 0);
        Plan plan = new Plan((GenericDataSinkBase)finalResult, "Iteration with AllReducer (keyless Reducer)");
        plan.setDefaultParallelism(numSubTasks);
        Assert.assertTrue((plan.getDefaultParallelism() > 1 ? 1 : 0) != 0);
        return plan;
    }

    public static final class PickOneReducer
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Record> it, Collector<Record> out) {
            out.collect((Object)it.next());
        }
    }
}

