package org.apache.flink.ml.feature.sqltransformer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.ml.api.Transformer;
import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
import org.apache.flink.ml.param.Param;
import org.apache.flink.ml.util.ParamUtils;
import org.apache.flink.ml.util.ReadWriteUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/ml/feature/sqltransformer/SQLTransformer.class */
public class SQLTransformer implements Transformer<SQLTransformer>, SQLTransformerParams<SQLTransformer> {
    static final String TABLE_IDENTIFIER = "__THIS__";
    private static final String INSERT_ONLY_EXCEPTION_PATTERN = "^.* doesn't support consuming .* changes which is produced by node .*$";
    private final Map<Param<?>, Object> paramMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.ml.feature.sqltransformer.SQLTransformer$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/ml/feature/sqltransformer/SQLTransformer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/ml/feature/sqltransformer/SQLTransformer$ChangeLogStreamToDataStreamFunction.class */
    private static class ChangeLogStreamToDataStreamFunction implements AggregateFunction<Row, List<Row>, List<Row>> {
        private ChangeLogStreamToDataStreamFunction() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public List<Row> m165createAccumulator() {
            return new ArrayList();
        }

        public List<Row> add(Row row, List<Row> list) {
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$types$RowKind[row.getKind().ordinal()]) {
                case 1:
                    list.add(row);
                    break;
                case 2:
                    row.setKind(RowKind.INSERT);
                    list.add(row);
                    break;
                case 3:
                case 4:
                    row.setKind(RowKind.INSERT);
                    list.remove(row);
                    break;
                default:
                    throw new UnsupportedOperationException();
            }
            return list;
        }

        public List<Row> getResult(List<Row> list) {
            return list;
        }

        public List<Row> merge(List<Row> list, List<Row> list2) {
            list.addAll(list2);
            return list;
        }

        /* synthetic */ ChangeLogStreamToDataStreamFunction(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/flink/ml/feature/sqltransformer/SQLTransformer$FlattenListFunction.class */
    private static class FlattenListFunction<T> implements FlatMapFunction<List<T>, T> {
        private FlattenListFunction() {
        }

        public void flatMap(List<T> list, Collector<T> collector) throws Exception {
            Iterator<T> it = list.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
        }

        /* synthetic */ FlattenListFunction(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public SQLTransformer() {
        ParamUtils.initializeMapWithDefaultValues(this.paramMap, this);
    }

    @Override // org.apache.flink.ml.api.AlgoOperator
    public Table[] transform(Table... tableArr) {
        Preconditions.checkArgument(tableArr.length == 1);
        StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment) ((TableImpl) tableArr[0]).getTableEnvironment();
        Table sqlQuery = streamTableEnvironment.sqlQuery(getStatement().replace(TABLE_IDENTIFIER, tableArr[0].toString()));
        if (!isInsertOnlyTable(streamTableEnvironment, sqlQuery)) {
            Schema build = Schema.newBuilder().fromResolvedSchema(sqlQuery.getResolvedSchema()).build();
            DataStream changelogStream = streamTableEnvironment.toChangelogStream(sqlQuery, build);
            sqlQuery = streamTableEnvironment.fromDataStream(changelogStream.windowAll(EndOfStreamWindows.get()).aggregate(new ChangeLogStreamToDataStreamFunction(null), Types.LIST(changelogStream.getType()), Types.LIST(changelogStream.getType())).flatMap(new FlattenListFunction(null), changelogStream.getType()), build);
        }
        return new Table[]{sqlQuery};
    }

    @Override // org.apache.flink.ml.api.Stage
    public void save(String str) throws IOException {
        ReadWriteUtils.saveMetadata(this, str);
    }

    public static SQLTransformer load(StreamTableEnvironment streamTableEnvironment, String str) throws IOException {
        return (SQLTransformer) ReadWriteUtils.loadStageParam(str);
    }

    @Override // org.apache.flink.ml.param.WithParams
    public Map<Param<?>, Object> getParamMap() {
        return this.paramMap;
    }

    private boolean isInsertOnlyTable(StreamTableEnvironment streamTableEnvironment, Table table) {
        try {
            streamTableEnvironment.toDataStream(table);
            return true;
        } catch (Exception e) {
            if ((e instanceof TableException) && e.getMessage() != null && e.getMessage().matches(INSERT_ONLY_EXCEPTION_PATTERN)) {
                return false;
            }
            throw e;
        }
    }
}
