package org.apache.flink.ml.util;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.ml.api.AlgoOperator;
import org.apache.flink.ml.api.Estimator;
import org.apache.flink.ml.api.Model;
import org.apache.flink.ml.api.Stage;
import org.apache.flink.ml.api.Transformer;
import org.apache.flink.ml.common.datastream.TableUtils;
import org.apache.flink.ml.linalg.DenseVector;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfo;
import org.apache.flink.ml.servable.api.DataFrame;
import org.apache.flink.ml.servable.api.TransformerServable;
import org.apache.flink.ml.servable.types.DataType;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;

/* loaded from: input_file:org/apache/flink/ml/util/TestUtils.class */
public class TestUtils {

    /* loaded from: input_file:org/apache/flink/ml/util/TestUtils$IntEncoder.class */
    public static class IntEncoder implements Encoder<Integer> {
        public void encode(Integer num, OutputStream outputStream) throws IOException {
            DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
            dataOutputStream.writeInt(num.intValue());
            dataOutputStream.flush();
        }
    }

    /* loaded from: input_file:org/apache/flink/ml/util/TestUtils$IntegerStreamFormat.class */
    public static class IntegerStreamFormat extends SimpleStreamFormat<Integer> {
        public StreamFormat.Reader<Integer> createReader(Configuration configuration, final FSDataInputStream fSDataInputStream) {
            return new StreamFormat.Reader<Integer>() { // from class: org.apache.flink.ml.util.TestUtils.IntegerStreamFormat.1
                private final DataInputStream dataStream;

                {
                    this.dataStream = new DataInputStream(fSDataInputStream);
                }

                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Integer m10read() throws IOException {
                    try {
                        return Integer.valueOf(this.dataStream.readInt());
                    } catch (EOFException e) {
                        return null;
                    }
                }

                public void close() throws IOException {
                    this.dataStream.close();
                }
            };
        }

        public TypeInformation<Integer> getProducedType() {
            return BasicTypeInfo.INT_TYPE_INFO;
        }
    }

    public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
        StreamExecutionEnvironment executionEnvironment = getExecutionEnvironment();
        executionEnvironment.configure(configuration);
        return executionEnvironment;
    }

    public static StreamExecutionEnvironment getExecutionEnvironment() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        executionEnvironment.getConfig().disableGenericTypes();
        executionEnvironment.setParallelism(4);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        return executionEnvironment;
    }

    public static void executeAndCheckOutput(StreamExecutionEnvironment streamExecutionEnvironment, Stage<?> stage, List<List<Integer>> list, List<Integer> list2, List<List<Integer>> list3, List<Integer> list4) throws Exception {
        Table table;
        StreamTableEnvironment create = StreamTableEnvironment.create(streamExecutionEnvironment);
        Table[] tableArr = new Table[list.size()];
        for (int i = 0; i < tableArr.length; i++) {
            tableArr[i] = create.fromDataStream(streamExecutionEnvironment.fromCollection(list.get(i)));
        }
        Table table2 = null;
        if (stage instanceof AlgoOperator) {
            if (list3 != null) {
                Table[] tableArr2 = new Table[list3.size()];
                for (int i2 = 0; i2 < tableArr2.length; i2++) {
                    tableArr2[i2] = create.fromDataStream(streamExecutionEnvironment.fromCollection(list3.get(i2)));
                }
                ((Model) stage).setModelData(tableArr2);
            }
            table = ((AlgoOperator) stage).transform(tableArr)[0];
            if (list4 != null) {
                table2 = ((Model) stage).getModelData()[0];
            }
        } else {
            Model fit = ((Estimator) stage).fit(tableArr);
            if (list3 != null) {
                Table[] tableArr3 = new Table[list3.size()];
                for (int i3 = 0; i3 < tableArr3.length; i3++) {
                    tableArr3[i3] = create.fromDataStream(streamExecutionEnvironment.fromCollection(list3.get(i3)));
                }
                fit.setModelData(tableArr3);
            }
            table = fit.transform(tableArr)[0];
            if (list4 != null) {
                table2 = fit.getModelData()[0];
            }
        }
        TestBaseUtils.compareResultCollections(list2, IteratorUtils.toList(create.toDataStream(table, Integer.class).executeAndCollect()), Comparator.naturalOrder());
        if (list4 != null) {
            TestBaseUtils.compareResultCollections(list4, IteratorUtils.toList(create.toDataStream(table2, Integer.class).executeAndCollect()), Comparator.naturalOrder());
        }
    }

    public static <T extends Stage<T>> T saveAndReload(StreamTableEnvironment streamTableEnvironment, T t, String str, BiFunctionWithException<StreamTableEnvironment, String, T, IOException> biFunctionWithException) throws Exception {
        StreamExecutionEnvironment executionEnvironment = TableUtils.getExecutionEnvironment(streamTableEnvironment);
        t.save(str);
        try {
            executionEnvironment.execute();
        } catch (RuntimeException e) {
            if (!e.getMessage().equals("No operators defined in streaming topology. Cannot execute.")) {
                throw e;
            }
        }
        return (T) biFunctionWithException.apply(streamTableEnvironment, str);
    }

    public static <T extends TransformerServable<T>> T saveAndLoadServable(StreamTableEnvironment streamTableEnvironment, Transformer<?> transformer, String str, FunctionWithException<String, T, IOException> functionWithException) throws Exception {
        StreamExecutionEnvironment executionEnvironment = TableUtils.getExecutionEnvironment(streamTableEnvironment);
        transformer.save(str);
        try {
            executionEnvironment.execute();
        } catch (RuntimeException e) {
            if (!e.getMessage().equals("No operators defined in streaming topology. Cannot execute.")) {
                throw e;
            }
        }
        return (T) functionWithException.apply(str);
    }

    public static Table convertDataTypesToSparseInt(StreamTableEnvironment streamTableEnvironment, Table table) {
        RowTypeInfo rowTypeInfo = TableUtils.getRowTypeInfo(table.getResolvedSchema());
        TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes();
        for (int i = 0; i < fieldTypes.length; i++) {
            if (fieldTypes[i].getTypeClass().equals(DenseVector.class)) {
                fieldTypes[i] = SparseVectorTypeInfo.INSTANCE;
            } else if (fieldTypes[i].getTypeClass().equals(Double.class)) {
                fieldTypes[i] = Types.INT;
            }
        }
        return streamTableEnvironment.fromDataStream(streamTableEnvironment.toDataStream(table).map(new MapFunction<Row, Row>() { // from class: org.apache.flink.ml.util.TestUtils.1
            public Row map(Row row) {
                int arity = row.getArity();
                for (int i2 = 0; i2 < arity; i2++) {
                    Object field = row.getField(i2);
                    if (field instanceof Vector) {
                        row.setField(i2, ((Vector) field).toSparse());
                    } else if (field instanceof Number) {
                        row.setField(i2, Integer.valueOf(((Number) field).intValue()));
                    }
                }
                return row;
            }
        }, new RowTypeInfo((TypeInformation[]) ArrayUtils.addAll(fieldTypes, new TypeInformation[0]), (String[]) ArrayUtils.addAll(rowTypeInfo.getFieldNames(), new String[0]))));
    }

    public static Class<?>[] getColumnDataTypes(Table table) {
        return (Class[]) table.getResolvedSchema().getColumnDataTypes().stream().map((v0) -> {
            return v0.getConversionClass();
        }).toArray(i -> {
            return new Class[i];
        });
    }

    public static int compare(Vector vector, Vector vector2) {
        if (vector.size() != vector2.size()) {
            return Integer.compare(vector.size(), vector2.size());
        }
        for (int i = 0; i < vector.size(); i++) {
            int compare = Double.compare(vector.get(i), vector2.get(i));
            if (compare != 0) {
                return compare;
            }
        }
        return 0;
    }

    public static DataFrame constructDataFrame(List<String> list, List<DataType> list2, List<Row> list3) {
        ArrayList arrayList = new ArrayList();
        for (Row row : list3) {
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < row.getArity(); i++) {
                arrayList2.add(row.getField(i));
            }
            arrayList.add(new org.apache.flink.ml.servable.api.Row(arrayList2));
        }
        return new DataFrame(list, list2, arrayList);
    }
}
