package org.apache.flink.ml.clustering.kmeans;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.ml.api.Model;
import org.apache.flink.ml.common.datastream.TableUtils;
import org.apache.flink.ml.common.distance.DistanceMeasure;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.VectorWithNorm;
import org.apache.flink.ml.param.Param;
import org.apache.flink.ml.util.ParamUtils;
import org.apache.flink.ml.util.ReadWriteUtils;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/ml/clustering/kmeans/OnlineKMeansModel.class */
public class OnlineKMeansModel implements Model<OnlineKMeansModel>, KMeansModelParams<OnlineKMeansModel> {
    public static final String MODEL_DATA_VERSION_GAUGE_KEY = "modelDataVersion";
    private final Map<Param<?>, Object> paramMap = new HashMap();
    private Table modelDataTable;

    /* loaded from: input_file:org/apache/flink/ml/clustering/kmeans/OnlineKMeansModel$PredictLabelOperator.class */
    private static class PredictLabelOperator extends AbstractStreamOperator<Row> implements TwoInputStreamOperator<Row, KMeansModelData, Row> {
        private final RowTypeInfo inputTypeInfo;
        private final String featuresCol;
        private final DistanceMeasure distanceMeasure;
        private final int k;
        private VectorWithNorm[] centroids;
        private ListState<Row> bufferedPointsState;
        private int modelDataVersion = 0;

        public PredictLabelOperator(RowTypeInfo rowTypeInfo, String str, DistanceMeasure distanceMeasure, int i) {
            this.inputTypeInfo = rowTypeInfo;
            this.featuresCol = str;
            this.distanceMeasure = distanceMeasure;
            this.k = i;
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.bufferedPointsState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("bufferedPoints", this.inputTypeInfo));
        }

        public void open() throws Exception {
            super.open();
            getRuntimeContext().getMetricGroup().gauge("modelDataVersion", () -> {
                return Integer.toString(this.modelDataVersion);
            });
        }

        public void processElement1(StreamRecord<Row> streamRecord) throws Exception {
            Row row = (Row) streamRecord.getValue();
            if (this.centroids == null) {
                this.bufferedPointsState.add(row);
            } else {
                this.output.collect(new StreamRecord(Row.join(row, new Row[]{Row.of(new Object[]{Integer.valueOf(this.distanceMeasure.findClosest(this.centroids, new VectorWithNorm(((Vector) row.getField(this.featuresCol)).toDense())))})})));
            }
        }

        public void processElement2(StreamRecord<KMeansModelData> streamRecord) throws Exception {
            KMeansModelData kMeansModelData = (KMeansModelData) streamRecord.getValue();
            Preconditions.checkArgument(kMeansModelData.centroids.length <= this.k);
            this.centroids = new VectorWithNorm[kMeansModelData.centroids.length];
            for (int i = 0; i < this.centroids.length; i++) {
                this.centroids[i] = new VectorWithNorm(kMeansModelData.centroids[i]);
            }
            this.modelDataVersion++;
            Iterator it = ((Iterable) this.bufferedPointsState.get()).iterator();
            while (it.hasNext()) {
                processElement1(new StreamRecord<>((Row) it.next()));
            }
            this.bufferedPointsState.clear();
        }
    }

    public OnlineKMeansModel() {
        ParamUtils.initializeMapWithDefaultValues(this.paramMap, this);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.ml.api.Model
    public OnlineKMeansModel setModelData(Table... tableArr) {
        Preconditions.checkArgument(tableArr.length == 1);
        this.modelDataTable = tableArr[0];
        return this;
    }

    @Override // org.apache.flink.ml.api.Model
    public Table[] getModelData() {
        return new Table[]{this.modelDataTable};
    }

    @Override // org.apache.flink.ml.api.AlgoOperator
    public Table[] transform(Table... tableArr) {
        Preconditions.checkArgument(tableArr.length == 1);
        StreamTableEnvironment tableEnvironment = ((TableImpl) tableArr[0]).getTableEnvironment();
        RowTypeInfo rowTypeInfo = TableUtils.getRowTypeInfo(tableArr[0].getResolvedSchema());
        return new Table[]{tableEnvironment.fromDataStream(tableEnvironment.toDataStream(tableArr[0]).connect(KMeansModelData.getModelDataStream(this.modelDataTable).broadcast()).transform("PredictLabelOperator", new RowTypeInfo((TypeInformation[]) ArrayUtils.addAll(rowTypeInfo.getFieldTypes(), new TypeInformation[]{Types.INT}), (String[]) ArrayUtils.addAll(rowTypeInfo.getFieldNames(), new String[]{getPredictionCol()})), new PredictLabelOperator(rowTypeInfo, getFeaturesCol(), DistanceMeasure.getInstance(getDistanceMeasure()), getK())))};
    }

    @Override // org.apache.flink.ml.param.WithParams
    public Map<Param<?>, Object> getParamMap() {
        return this.paramMap;
    }

    @Override // org.apache.flink.ml.api.Stage
    public void save(String str) throws IOException {
        ReadWriteUtils.saveMetadata(this, str);
    }

    public static OnlineKMeansModel load(StreamTableEnvironment streamTableEnvironment, String str) throws IOException {
        return (OnlineKMeansModel) ReadWriteUtils.loadStageParam(str);
    }
}
