package org.apache.flink.test.iterative;

import java.io.BufferedReader;
import java.util.Iterator;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.testdata.ConnectedComponentsData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase.class */
public class CoGroupConnectedComponentsITCase extends JavaProgramTestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    private static final int MAX_ITERATIONS = 100;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    @FunctionAnnotation.ForwardedFieldsFirst({"f1->f1"})
    @FunctionAnnotation.ForwardedFieldsSecond({"f0->f0"})
    /* loaded from: input_file:org/apache/flink/test/iterative/CoGroupConnectedComponentsITCase$MinIdAndUpdate.class */
    private static final class MinIdAndUpdate implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1;

        private MinIdAndUpdate() {
        }

        public void coGroup(Iterable<Tuple2<Long, Long>> iterable, Iterable<Tuple2<Long, Long>> iterable2, Collector<Tuple2<Long, Long>> collector) throws Exception {
            Iterator<Tuple2<Long, Long>> it = iterable2.iterator();
            if (!it.hasNext()) {
                throw new Exception("Error: Id not encountered before.");
            }
            Tuple2<Long, Long> next = it.next();
            long longValue = ((Long) next.f1).longValue();
            long j = Long.MAX_VALUE;
            Iterator<Tuple2<Long, Long>> it2 = iterable.iterator();
            while (it2.hasNext()) {
                long longValue2 = ((Long) it2.next().f1).longValue();
                if (longValue2 < j) {
                    j = longValue2;
                }
            }
            if (j < longValue) {
                collector.collect(new Tuple2(next.f0, Long.valueOf(j)));
            }
        }
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
        this.edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
        this.resultPath = getTempFilePath("results");
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader bufferedReader : getResultReader(this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult(bufferedReader);
        }
    }

    protected void testProgram() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        Operator name = executionEnvironment.readCsvFile(this.verticesPath).fieldDelimiter(" ").types(Long.class).name("Vertices");
        Operator name2 = executionEnvironment.readCsvFile(this.edgesPath).fieldDelimiter(" ").types(Long.class, Long.class).name("Edges");
        Operator name3 = name.map(new MapFunction<Tuple1<Long>, Tuple2<Long, Long>>() { // from class: org.apache.flink.test.iterative.CoGroupConnectedComponentsITCase.1
            public Tuple2<Long, Long> map(Tuple1<Long> tuple1) throws Exception {
                return new Tuple2<>(tuple1.f0, tuple1.f0);
            }
        }).name("Assign Vertex Ids");
        DeltaIteration iterateDelta = name3.iterateDelta(name3, MAX_ITERATIONS, new int[]{0});
        CoGroupOperator name4 = iterateDelta.getWorkset().join(name2).where(new int[]{0}).equalTo(new int[]{0}).with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { // from class: org.apache.flink.test.iterative.CoGroupConnectedComponentsITCase.2
            public Tuple2<Long, Long> join(Tuple2<Long, Long> tuple2, Tuple2<Long, Long> tuple22) throws Exception {
                return new Tuple2<>(tuple22.f1, tuple2.f1);
            }
        }).name("Join Candidate Id With Neighbor").coGroup(iterateDelta.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).with(new MinIdAndUpdate()).name("min Id and Update");
        iterateDelta.closeWith(name4, name4).writeAsCsv(this.resultPath, "\n", " ").name("Result");
        executionEnvironment.execute("Workset Connected Components");
    }
}
