package org.apache.flink.graph.asm.degree.annotate.undirected;

import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.class */
public class EdgeSourceDegree<K, VV, EV> extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
    private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
    private int parallelism = -1;

    public EdgeSourceDegree<K, VV, EV> setReduceOnTargetId(boolean z) {
        this.reduceOnTargetId.set(z);
        return this;
    }

    public EdgeSourceDegree<K, VV, EV> setParallelism(int i) {
        Preconditions.checkArgument(i > 0 || i == -1, "The parallelism must be greater than zero.");
        this.parallelism = i;
        return this;
    }

    @Override // org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet
    protected String getAlgorithmName() {
        return EdgeSourceDegree.class.getName();
    }

    @Override // org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet
    protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet graphAlgorithmWrappingDataSet) {
        Preconditions.checkNotNull(graphAlgorithmWrappingDataSet);
        if (!EdgeSourceDegree.class.isAssignableFrom(graphAlgorithmWrappingDataSet.getClass())) {
            return false;
        }
        EdgeSourceDegree edgeSourceDegree = (EdgeSourceDegree) graphAlgorithmWrappingDataSet;
        this.reduceOnTargetId.mergeWith(edgeSourceDegree.reduceOnTargetId);
        this.parallelism = this.parallelism == -1 ? edgeSourceDegree.parallelism : edgeSourceDegree.parallelism == -1 ? this.parallelism : Math.min(this.parallelism, edgeSourceDegree.parallelism);
        return true;
    }

    @Override // org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet
    public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> graph) throws Exception {
        return graph.getEdges().join((DataSet) graph.run(new VertexDegree().setReduceOnTargetId(this.reduceOnTargetId.get()).setParallelism(this.parallelism)), JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND).where(new int[]{0}).equalTo(new int[]{0}).with(new DegreeAnnotationFunctions.JoinEdgeWithVertexDegree()).setParallelism(this.parallelism).name("Edge source degree");
    }
}
