/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative;

import java.io.BufferedReader;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.record.functions.CoGroupFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.DeltaIteration;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

public class CoGroupConnectedComponentsITCase
extends RecordAPITestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    public CoGroupConnectedComponentsITCase() {
        this.setTaskManagerNumSlots(4);
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = this.createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices((int)1000));
        this.edgesPath = this.createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges((int)10000, (int)1000, (long)3287269182979823L));
        this.resultPath = this.getTempFilePath("results");
    }

    protected Plan getTestJob() {
        return CoGroupConnectedComponentsITCase.getPlan(4, this.verticesPath, this.edgesPath, this.resultPath, 100);
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader reader : CoGroupConnectedComponentsITCase.getResultReader((String)this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult((BufferedReader)reader);
        }
    }

    public static Plan getPlan(int numSubTasks, String verticesInput, String edgeInput, String output, int maxIterations) {
        FileDataSource initialVertices = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class}), verticesInput, "Vertices");
        MapOperator verticesWithId = MapOperator.builder(WorksetConnectedComponents.DuplicateLongMap.class).input((Operator)initialVertices).name("Assign Vertex Ids").build();
        DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
        iteration.setInitialSolutionSet((Operator)verticesWithId);
        iteration.setInitialWorkset((Operator)verticesWithId);
        iteration.setMaximumNumberOfIterations(maxIterations);
        FileDataSource edges = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), edgeInput, "Edges");
        JoinOperator joinWithNeighbors = JoinOperator.builder((JoinFunction)new WorksetConnectedComponents.NeighborWithComponentIDJoin(), LongValue.class, (int)0, (int)0).input1(iteration.getWorkset()).input2((Operator)edges).name("Join Candidate Id With Neighbor").build();
        CoGroupOperator minAndUpdate = CoGroupOperator.builder((CoGroupFunction)new MinIdAndUpdate(), LongValue.class, (int)0, (int)0).input1((Operator)joinWithNeighbors).input2(iteration.getSolutionSet()).name("Min Id and Update").build();
        iteration.setNextWorkset((Operator)minAndUpdate);
        iteration.setSolutionSetDelta((Operator)minAndUpdate);
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)iteration, "Result");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter(' ')).field(LongValue.class, 0)).field(LongValue.class, 1);
        Plan plan = new Plan((GenericDataSinkBase)result, "Workset Connected Components");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    @FunctionAnnotation.ConstantFieldsFirst(value={0})
    @FunctionAnnotation.ConstantFieldsSecond(value={0})
    public static final class MinIdAndUpdate
    extends CoGroupFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final LongValue newComponentId = new LongValue();

        public void coGroup(Iterator<Record> candidates, Iterator<Record> current, Collector<Record> out) throws Exception {
            if (!current.hasNext()) {
                throw new Exception("Error: Id not encountered before.");
            }
            Record old = current.next();
            long oldId = ((LongValue)old.getField(1, LongValue.class)).getValue();
            long minimumComponentID = Long.MAX_VALUE;
            while (candidates.hasNext()) {
                Record candidate = candidates.next();
                long candidateComponentID = ((LongValue)candidate.getField(1, LongValue.class)).getValue();
                if (candidateComponentID >= minimumComponentID) continue;
                minimumComponentID = candidateComponentID;
            }
            if (minimumComponentID < oldId) {
                this.newComponentId.setValue(minimumComponentID);
                old.setField(1, (Value)this.newComponentId);
                out.collect((Object)old);
            }
        }
    }
}

