package org.apache.flink.ml.regression;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.ml.linalg.SparseVector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
import org.apache.flink.ml.regression.linearregression.LinearRegression;
import org.apache.flink.ml.regression.linearregression.LinearRegressionModel;
import org.apache.flink.ml.regression.linearregression.LinearRegressionModelData;
import org.apache.flink.ml.util.ReadWriteUtils;
import org.apache.flink.ml.util.TestUtils;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/ml/regression/LinearRegressionTest.class */
public class LinearRegressionTest {

    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();
    private StreamExecutionEnvironment env;
    private StreamTableEnvironment tEnv;
    private static final List<Row> trainData = Arrays.asList(Row.of(new Object[]{Vectors.dense(new double[]{2.0d, 1.0d}), Double.valueOf(4.0d), Double.valueOf(1.0d)}), Row.of(new Object[]{Vectors.dense(new double[]{3.0d, 2.0d}), Double.valueOf(7.0d), Double.valueOf(1.0d)}), Row.of(new Object[]{Vectors.dense(new double[]{4.0d, 3.0d}), Double.valueOf(10.0d), Double.valueOf(1.0d)}), Row.of(new Object[]{Vectors.dense(new double[]{2.0d, 4.0d}), Double.valueOf(10.0d), Double.valueOf(1.0d)}), Row.of(new Object[]{Vectors.dense(new double[]{2.0d, 2.0d}), Double.valueOf(6.0d), Double.valueOf(1.0d)}), Row.of(new Object[]{Vectors.dense(new double[]{4.0d, 3.0d}), Double.valueOf(10.0d), Double.valueOf(1.0d)}), Row.of(new Object[]{Vectors.dense(new double[]{1.0d, 2.0d}), Double.valueOf(5.0d), Double.valueOf(1.0d)}), Row.of(new Object[]{Vectors.dense(new double[]{5.0d, 3.0d}), Double.valueOf(11.0d), Double.valueOf(1.0d)}));
    private static final double[] expectedCoefficient = {1.141d, 1.829d};
    private static final double TOLERANCE = 1.0E-7d;
    private static final double PREDICTION_TOLERANCE = 0.1d;
    private static final double COEFFICIENT_TOLERANCE = 0.1d;
    private Table trainDataTable;

    @Before
    public void before() {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        this.env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        this.env.setParallelism(4);
        this.env.enableCheckpointing(100L);
        this.env.setRestartStrategy(RestartStrategies.noRestart());
        this.tEnv = StreamTableEnvironment.create(this.env);
        Collections.shuffle(trainData);
        this.trainDataTable = this.tEnv.fromDataStream(this.env.fromCollection(trainData, new RowTypeInfo(new TypeInformation[]{DenseVectorTypeInfo.INSTANCE, Types.DOUBLE, Types.DOUBLE}, new String[]{"features", "label", "weight"})));
    }

    private void verifyPredictionResult(Table table, String str, String str2) throws Exception {
        for (Row row : IteratorUtils.toList(this.tEnv.toDataStream(table).executeAndCollect())) {
            double doubleValue = ((Number) row.getField(str)).doubleValue();
            Assert.assertTrue(Math.abs(((Double) row.getField(str2)).doubleValue() - doubleValue) / doubleValue < 0.1d);
        }
    }

    @Test
    public void testParam() {
        LinearRegression linearRegression = new LinearRegression();
        Assert.assertEquals("features", linearRegression.getFeaturesCol());
        Assert.assertEquals("label", linearRegression.getLabelCol());
        Assert.assertNull(linearRegression.getWeightCol());
        Assert.assertEquals(20L, linearRegression.getMaxIter());
        Assert.assertEquals(1.0E-6d, linearRegression.getTol(), TOLERANCE);
        Assert.assertEquals(0.1d, linearRegression.getLearningRate(), TOLERANCE);
        Assert.assertEquals(32L, linearRegression.getGlobalBatchSize());
        Assert.assertEquals(0.0d, linearRegression.getReg(), TOLERANCE);
        Assert.assertEquals(0.0d, linearRegression.getElasticNet(), TOLERANCE);
        Assert.assertEquals("prediction", linearRegression.getPredictionCol());
        ((LinearRegression) ((LinearRegression) ((LinearRegression) ((LinearRegression) ((LinearRegression) ((LinearRegression) ((LinearRegression) ((LinearRegression) ((LinearRegression) linearRegression.setFeaturesCol("test_features")).setLabelCol("test_label")).setWeightCol("test_weight")).setMaxIter(1000)).setTol(Double.valueOf(0.001d))).setLearningRate(Double.valueOf(0.5d))).setGlobalBatchSize(1000)).setReg(Double.valueOf(0.1d))).setElasticNet(Double.valueOf(0.5d))).setPredictionCol("test_predictionCol");
        Assert.assertEquals("test_features", linearRegression.getFeaturesCol());
        Assert.assertEquals("test_label", linearRegression.getLabelCol());
        Assert.assertEquals("test_weight", linearRegression.getWeightCol());
        Assert.assertEquals(1000L, linearRegression.getMaxIter());
        Assert.assertEquals(0.001d, linearRegression.getTol(), TOLERANCE);
        Assert.assertEquals(0.5d, linearRegression.getLearningRate(), TOLERANCE);
        Assert.assertEquals(1000L, linearRegression.getGlobalBatchSize());
        Assert.assertEquals(0.1d, linearRegression.getReg(), TOLERANCE);
        Assert.assertEquals(0.5d, linearRegression.getElasticNet(), TOLERANCE);
        Assert.assertEquals("test_predictionCol", linearRegression.getPredictionCol());
    }

    @Test
    public void testOutputSchema() {
        Assert.assertEquals(Arrays.asList("test_features", "test_label", "test_weight", "test_predictionCol"), ((LinearRegression) ((LinearRegression) ((LinearRegression) ((LinearRegression) new LinearRegression().setFeaturesCol("test_features")).setLabelCol("test_label")).setWeightCol("test_weight")).setPredictionCol("test_predictionCol")).fit(new Table[]{this.trainDataTable}).transform(new Table[]{this.trainDataTable.as("test_features", new String[]{"test_label", "test_weight"})})[0].getResolvedSchema().getColumnNames());
    }

    @Test
    public void testFitAndPredict() throws Exception {
        LinearRegression linearRegression = (LinearRegression) new LinearRegression().setWeightCol("weight");
        verifyPredictionResult(linearRegression.fit(new Table[]{this.trainDataTable}).transform(new Table[]{this.trainDataTable})[0], linearRegression.getLabelCol(), linearRegression.getPredictionCol());
    }

    @Test
    public void testInputTypeConversion() throws Exception {
        this.trainDataTable = TestUtils.convertDataTypesToSparseInt(this.tEnv, this.trainDataTable);
        Assert.assertArrayEquals(new Class[]{SparseVector.class, Integer.class, Integer.class}, TestUtils.getColumnDataTypes(this.trainDataTable));
        LinearRegression linearRegression = (LinearRegression) new LinearRegression().setWeightCol("weight");
        verifyPredictionResult(linearRegression.fit(new Table[]{this.trainDataTable}).transform(new Table[]{this.trainDataTable})[0], linearRegression.getLabelCol(), linearRegression.getPredictionCol());
    }

    @Test
    public void testSaveLoadAndPredict() throws Exception {
        LinearRegression saveAndReload = TestUtils.saveAndReload(this.tEnv, (LinearRegression) new LinearRegression().setWeightCol("weight"), this.tempFolder.newFolder().getAbsolutePath());
        LinearRegressionModel saveAndReload2 = TestUtils.saveAndReload(this.tEnv, saveAndReload.fit(new Table[]{this.trainDataTable}), this.tempFolder.newFolder().getAbsolutePath());
        Assert.assertEquals(Collections.singletonList("coefficient"), saveAndReload2.getModelData()[0].getResolvedSchema().getColumnNames());
        verifyPredictionResult(saveAndReload2.transform(new Table[]{this.trainDataTable})[0], saveAndReload.getLabelCol(), saveAndReload.getPredictionCol());
    }

    @Test
    public void testGetModelData() throws Exception {
        List list = IteratorUtils.toList(LinearRegressionModelData.getModelDataStream(((LinearRegression) new LinearRegression().setWeightCol("weight")).fit(new Table[]{this.trainDataTable}).getModelData()[0]).executeAndCollect());
        Assert.assertNotNull(list);
        Assert.assertEquals(1L, list.size());
        Assert.assertArrayEquals(expectedCoefficient, ((LinearRegressionModelData) list.get(0)).coefficient.values, 0.1d);
    }

    @Test
    public void testSetModelData() throws Exception {
        LinearRegression linearRegression = (LinearRegression) new LinearRegression().setWeightCol("weight");
        LinearRegressionModel fit = linearRegression.fit(new Table[]{this.trainDataTable});
        LinearRegressionModel linearRegressionModel = new LinearRegressionModel();
        ReadWriteUtils.updateExistingParams(linearRegressionModel, fit.getParamMap());
        linearRegressionModel.setModelData(fit.getModelData());
        verifyPredictionResult(linearRegressionModel.transform(new Table[]{this.trainDataTable})[0], linearRegression.getLabelCol(), linearRegression.getPredictionCol());
    }

    @Test
    public void testMoreSubtaskThanData() throws Exception {
        this.env.setParallelism(12);
        LinearRegression linearRegression = (LinearRegression) ((LinearRegression) new LinearRegression().setWeightCol("weight")).setGlobalBatchSize(128);
        verifyPredictionResult(linearRegression.fit(new Table[]{this.trainDataTable}).transform(new Table[]{this.trainDataTable})[0], linearRegression.getLabelCol(), linearRegression.getPredictionCol());
    }

    @Test
    public void testRegularization() throws Exception {
        checkRegularization(0.0d, RandomUtils.nextDouble(0.0d, 1.0d), expectedCoefficient);
        checkRegularization(0.1d, 0.0d, new double[]{1.165d, 1.78d});
        checkRegularization(0.1d, 1.0d, new double[]{1.143d, 1.812d});
        checkRegularization(0.1d, 0.5d, new double[]{1.154d, 1.796d});
    }

    private void checkRegularization(double d, double d2, double[] dArr) throws Exception {
        Assert.assertArrayEquals(dArr, ((LinearRegressionModelData) IteratorUtils.toList(LinearRegressionModelData.getModelDataStream(((LinearRegression) ((LinearRegression) ((LinearRegression) new LinearRegression().setWeightCol("weight")).setReg(Double.valueOf(d))).setElasticNet(Double.valueOf(d2))).fit(new Table[]{this.trainDataTable}).getModelData()[0]).executeAndCollect()).get(0)).coefficient.values, 0.001d);
    }
}
