/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recordJobs.kmeans.udfs;

import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.api.java.record.functions.FunctionAnnotation;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.test.recordJobs.kmeans.udfs.CoordVector;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

@ReduceOperator.Combinable
@FunctionAnnotation.ConstantFields(value={0})
public class RecomputeClusterCenter
extends ReduceFunction
implements Serializable {
    private static final long serialVersionUID = 1L;
    private final IntValue count = new IntValue();

    public void reduce(Iterator<Record> dataPoints, Collector<Record> out) {
        Record next = null;
        CoordVector coordinates = new CoordVector();
        double[] coordinateSum = null;
        int count = 0;
        while (dataPoints.hasNext()) {
            next = dataPoints.next();
            double[] thisCoords = ((CoordVector)next.getField(1, CoordVector.class)).getCoordinates();
            int thisCount = ((IntValue)next.getField(2, IntValue.class)).getValue();
            if (coordinateSum == null) {
                coordinateSum = coordinates.getCoordinates() != null ? coordinates.getCoordinates() : new double[thisCoords.length];
            }
            this.addToCoordVector(coordinateSum, thisCoords);
            count += thisCount;
        }
        int i = 0;
        while (i < coordinateSum.length) {
            int n = i++;
            coordinateSum[n] = coordinateSum[n] / (double)count;
        }
        coordinates.setCoordinates(coordinateSum);
        next.setField(1, (Value)coordinates);
        next.setNull(2);
        out.collect((Object)next);
    }

    public void combine(Iterator<Record> dataPoints, Collector<Record> out) {
        Record next = null;
        CoordVector coordinates = new CoordVector();
        double[] coordinateSum = null;
        int count = 0;
        while (dataPoints.hasNext()) {
            next = dataPoints.next();
            double[] thisCoords = ((CoordVector)next.getField(1, CoordVector.class)).getCoordinates();
            int thisCount = ((IntValue)next.getField(2, IntValue.class)).getValue();
            if (coordinateSum == null) {
                coordinateSum = coordinates.getCoordinates() != null ? coordinates.getCoordinates() : new double[thisCoords.length];
            }
            this.addToCoordVector(coordinateSum, thisCoords);
            count += thisCount;
        }
        coordinates.setCoordinates(coordinateSum);
        this.count.setValue(count);
        next.setField(1, (Value)coordinates);
        next.setField(2, (Value)this.count);
        out.collect((Object)next);
    }

    private void addToCoordVector(double[] cvToAddTo, double[] cvToBeAdded) {
        if (cvToAddTo.length != cvToBeAdded.length) {
            throw new IllegalArgumentException("The given coordinate vectors are not of equal length.");
        }
        for (int i = 0; i < cvToAddTo.length; ++i) {
            int n = i;
            cvToAddTo[n] = cvToAddTo[n] + cvToBeAdded[i];
        }
    }
}

