/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.accumulators;

import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class AccumulatorIterativeITCase
extends RecordAPITestBase {
    private static final String INPUT = "1\n2\n3\n";
    private static final String EXPECTED = "6\n";
    private static final int NUM_ITERATIONS = 3;
    private static final int NUM_SUBTASKS = 1;
    protected String dataPath;
    protected String resultPath;

    public AccumulatorIterativeITCase(Configuration config) {
        super(config);
    }

    protected void preSubmit() throws Exception {
        this.dataPath = this.createTempFile("datapoints.txt", INPUT);
        this.resultPath = this.getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        AccumulatorIterativeITCase.compareResultsByLinesInMemory((String)EXPECTED, (String)this.resultPath);
        Integer res = (Integer)this.getJobExecutionResult().getAccumulatorResult("test");
        Assert.assertEquals((Object)18, (Object)res);
    }

    protected Plan getTestJob() {
        Plan plan = AccumulatorIterativeITCase.getTestPlanPlan(this.config.getInteger("IterationAllReducer#NoSubtasks", 1), this.dataPath, this.resultPath);
        return plan;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration config1 = new Configuration();
        config1.setInteger("IterationAllReducer#NoSubtasks", 1);
        return AccumulatorIterativeITCase.toParameterList((Configuration[])new Configuration[]{config1});
    }

    static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
        FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
        BulkIteration iteration = new BulkIteration("Loop");
        iteration.setInput((Operator)initialInput);
        iteration.setMaximumNumberOfIterations(3);
        ReduceOperator sumReduce = ReduceOperator.builder((ReduceFunction)new SumReducer()).input(iteration.getPartialSolution()).name("Compute sum (Reduce)").build();
        iteration.setNextPartialSolution((Operator)sumReduce);
        FileDataSink finalResult = new FileDataSink((FileOutputFormat)new CsvOutputFormat("\n", " ", new Class[]{StringValue.class}), output, (Operator)iteration, "Output");
        Plan plan = new Plan((GenericDataSinkBase)finalResult, "Iteration with AllReducer (keyless Reducer)");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    static final class SumReducer
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private IntCounter testCounter = new IntCounter();

        SumReducer() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.getRuntimeContext().addAccumulator("test", (Accumulator)this.testCounter);
        }

        public void reduce(Iterator<Record> records, Collector<Record> out) {
            int sum = 0;
            while (records.hasNext()) {
                Record r = records.next();
                Integer value = Integer.parseInt(((StringValue)r.getField(0, StringValue.class)).getValue());
                sum += value.intValue();
                this.testCounter.add(value);
            }
            out.collect((Object)new Record((Value)new StringValue((CharSequence)Integer.toString(sum))));
        }
    }
}

