/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobTests;

import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class GroupOrderReduceITCase
extends RecordAPITestBase {
    private static final String INPUT = "1,3\n2,1\n5,1\n3,1\n1,8\n1,9\n1,2\n2,3\n7,1\n4,2\n2,7\n2,8\n1,1\n2,7\n5,4\n4,3\n3,6\n3,7\n1,3\n2,4\n7,1\n5,3\n4,5\n4,6\n1,4\n3,9\n8,5\n5,3\n5,4\n5,5\n1,7\n3,9\n9,3\n6,2\n6,3\n6,4\n1,8\n3,8\n8,7\n6,2\n7,2\n7,3\n1,1\n3,7\n9,2\n7,1\n8,1\n8,2\n1,2\n2,6\n8,7\n7,1\n9,1\n9,1\n1,1\n2,5\n9,5\n8,2\n10,2\n10,1\n1,1\n2,6\n2,7\n8,3\n11,3\n11,2\n1,2\n2,7\n4,2\n9,4\n12,8\n12,3\n1,2\n4,8\n1,7\n9,5\n13,9\n13,4\n1,3\n4,2\n3,2\n9,6\n14,7\n14,5\n";
    protected String textPath;
    protected String resultPath;

    public GroupOrderReduceITCase(Configuration config) {
        super(config);
        this.setTaskManagerNumSlots(4);
    }

    protected void preSubmit() throws Exception {
        this.textPath = this.createTempFile("pairs.csv", INPUT);
        this.resultPath = this.getTempDirPath("result");
    }

    protected Plan getTestJob() {
        int parallelism = this.config.getInteger("GroupOrderTest#NumSubtasks", 1);
        CsvInputFormat format = new CsvInputFormat(',', new Class[]{IntValue.class, IntValue.class});
        FileDataSource source = new FileDataSource((FileInputFormat)format, this.textPath, "Source");
        ReduceOperator reducer = ReduceOperator.builder(CheckingReducer.class).keyField(IntValue.class, 0).input((Operator)source).name("Ordered Reducer").build();
        reducer.setGroupOrder(new Ordering(1, IntValue.class, Order.ASCENDING));
        FileDataSink sink = new FileDataSink(CsvOutputFormat.class, this.resultPath, (Operator)reducer, "Sink");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)sink).recordDelimiter('\n')).fieldDelimiter(',')).field(IntValue.class, 0)).field(IntValue.class, 1);
        Plan p = new Plan((GenericDataSinkBase)sink);
        p.setDefaultParallelism(parallelism);
        return p;
    }

    protected void postSubmit() throws Exception {
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration config = new Configuration();
        config.setInteger("GroupOrderTest#NumSubtasks", 4);
        return GroupOrderReduceITCase.toParameterList((Configuration[])new Configuration[]{config});
    }

    public static final class CheckingReducer
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
            int lastValue = ((IntValue)records.next().getField(1, IntValue.class)).getValue();
            while (records.hasNext()) {
                int nextValue = ((IntValue)records.next().getField(1, IntValue.class)).getValue();
                if (nextValue < lastValue) {
                    throw new Exception("Group Order is violated!");
                }
                lastValue = nextValue;
            }
        }
    }
}

