package org.apache.paimon.flink.action;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.fs.Path;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.shade.org.antlr.v4.runtime.tree.xpath.XPath;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/MergeIntoAction.class */
public class MergeIntoAction extends TableActionBase {
    private static final Logger LOG = LoggerFactory.getLogger(MergeIntoAction.class);
    private final List<String> primaryKeys;
    private final List<DataStructureConverter<Object, Object>> converters;
    private final List<String> targetFieldNames;

    @Nullable
    private String targetAlias;
    private String sourceTable;

    @Nullable
    private String[] sourceSqls;
    private String mergeCondition;
    boolean matchedUpsert;
    boolean notMatchedUpsert;
    boolean matchedDelete;
    boolean notMatchedDelete;
    boolean insert;

    @Nullable
    String matchedUpsertCondition;

    @Nullable
    private String matchedUpsertSet;

    @Nullable
    String notMatchedBySourceUpsertCondition;

    @Nullable
    String notMatchedBySourceUpsertSet;

    @Nullable
    String matchedDeleteCondition;

    @Nullable
    String notMatchedBySourceDeleteCondition;

    @Nullable
    private String notMatchedInsertCondition;

    @Nullable
    private String notMatchedInsertValues;

    public MergeIntoAction(String str, String str2, String str3) {
        this(str, str2, str3, Collections.emptyMap());
    }

    public MergeIntoAction(String str, String str2, String str3, Map<String, String> map) {
        super(str, str2, str3, map);
        if (!(this.table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException(String.format("Only FileStoreTable supports merge-into action. The table type is '%s'.", this.table.getClass().getName()));
        }
        changeIgnoreMergeEngine();
        this.primaryKeys = ((FileStoreTable) this.table).schema().primaryKeys();
        if (this.primaryKeys.isEmpty()) {
            throw new UnsupportedOperationException("merge-into action doesn't support table with no primary keys defined.");
        }
        this.converters = (List) this.table.rowType().getFieldTypes().stream().map(LogicalTypeConversion::toLogicalType).map(TypeConversions::fromLogicalToDataType).map(DataStructureConverters::getConverter).collect(Collectors.toList());
        this.targetFieldNames = (List) this.table.rowType().getFields().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
    }

    public MergeIntoAction withTargetAlias(String str) {
        this.targetAlias = str;
        return this;
    }

    public MergeIntoAction withSourceTable(String str) {
        this.sourceTable = str;
        return this;
    }

    public MergeIntoAction withSourceSqls(String... strArr) {
        this.sourceSqls = strArr;
        return this;
    }

    public MergeIntoAction withMergeCondition(String str) {
        this.mergeCondition = str;
        return this;
    }

    public MergeIntoAction withMatchedUpsert(@Nullable String str, String str2) {
        this.matchedUpsert = true;
        this.matchedUpsertCondition = str;
        this.matchedUpsertSet = str2;
        return this;
    }

    public MergeIntoAction withNotMatchedBySourceUpsert(@Nullable String str, String str2) {
        this.notMatchedUpsert = true;
        this.notMatchedBySourceUpsertCondition = str;
        this.notMatchedBySourceUpsertSet = str2;
        return this;
    }

    public MergeIntoAction withMatchedDelete(@Nullable String str) {
        this.matchedDelete = true;
        this.matchedDeleteCondition = str;
        return this;
    }

    public MergeIntoAction withNotMatchedBySourceDelete(@Nullable String str) {
        this.notMatchedDelete = true;
        this.notMatchedBySourceDeleteCondition = str;
        return this;
    }

    public MergeIntoAction withNotMatchedInsert(@Nullable String str, String str2) {
        this.insert = true;
        this.notMatchedInsertCondition = str;
        this.notMatchedInsertValues = str2;
        return this;
    }

    @Override // org.apache.paimon.flink.action.Action
    public void run() throws Exception {
        handleTargetAlias();
        handleSqls();
        List list = (List) Stream.of((Object[]) new Optional[]{getMatchedUpsertDataStream(), getNotMatchedUpsertDataStream(), getMatchedDeleteDataStream(), getNotMatchedDeleteDataStream(), getInsertDataStream()}).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
        batchSink(((DataStream) list.get(0)).union((DataStream[]) list.stream().skip(1L).toArray(i -> {
            return new DataStream[i];
        })));
    }

    private void handleTargetAlias() {
        if (this.targetAlias != null) {
            this.batchTEnv.createTemporaryView(escapedTargetName(), this.batchTEnv.from(this.identifier.getFullName()));
        }
    }

    private void handleSqls() {
        if (this.sourceSqls != null) {
            for (String str : this.sourceSqls) {
                try {
                    this.batchTEnv.executeSql(str).await();
                } catch (Throwable th) {
                    LOG.error(String.format("Error occurs when executing sql:\n%s", str), th);
                    throw new RuntimeException(String.format("Error occurs when executing sql:\n%s", str), th);
                }
            }
        }
    }

    private Optional<DataStream<RowData>> getMatchedUpsertDataStream() {
        List list;
        if (!this.matchedUpsert) {
            return Optional.empty();
        }
        if (this.matchedUpsertSet.equals(XPath.WILDCARD)) {
            String[] split = this.sourceTable.split("\\.");
            list = Collections.singletonList(split[split.length - 1] + ".*");
        } else {
            Map<String, String> parseCommaSeparatedKeyValues = ActionFactory.parseCommaSeparatedKeyValues(this.matchedUpsertSet);
            for (String str : parseCommaSeparatedKeyValues.keySet()) {
                if (!this.targetFieldNames.contains(str)) {
                    throw new RuntimeException(String.format("Invalid column reference '%s' of table '%s' at matched-upsert action.", str, this.identifier.getFullName()));
                }
            }
            list = (List) this.targetFieldNames.stream().map(str2 -> {
                return (String) parseCommaSeparatedKeyValues.getOrDefault(str2, targetTableName() + Path.CUR_DIR + str2);
            }).collect(Collectors.toList());
        }
        Object[] objArr = new Object[5];
        objArr[0] = String.join(FieldListaggAgg.DELIMITER, list);
        objArr[1] = escapedTargetName();
        objArr[2] = escapedSourceName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.matchedUpsertCondition == null ? "" : "WHERE " + this.matchedUpsertCondition;
        String format = String.format("SELECT %s FROM %s INNER JOIN %s ON %s %s", objArr);
        LOG.info("Query used for matched-update:\n{}", format);
        Table sqlQuery = this.batchTEnv.sqlQuery(format);
        checkSchema("matched-upsert", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.UPDATE_AFTER, this.converters));
    }

    private Optional<DataStream<RowData>> getNotMatchedUpsertDataStream() {
        if (!this.notMatchedUpsert) {
            return Optional.empty();
        }
        Map<String, String> parseCommaSeparatedKeyValues = ActionFactory.parseCommaSeparatedKeyValues(this.notMatchedBySourceUpsertSet);
        for (String str : parseCommaSeparatedKeyValues.keySet()) {
            if (!this.targetFieldNames.contains(str)) {
                throw new RuntimeException(String.format("Invalid column reference '%s' of table '%s' at not-matched-by-source-upsert action.\nRun <action> --help for help.", str, this.identifier.getFullName()));
            }
            if (this.primaryKeys.contains(str)) {
                throw new RuntimeException("Not allowed to change primary key in not-matched-by-source-upsert-set.\nRun <action> --help for help.");
            }
        }
        Object[] objArr = new Object[5];
        objArr[0] = String.join(FieldListaggAgg.DELIMITER, (List) this.targetFieldNames.stream().map(str2 -> {
            return (String) parseCommaSeparatedKeyValues.getOrDefault(str2, str2);
        }).collect(Collectors.toList()));
        objArr[1] = escapedTargetName();
        objArr[2] = escapedSourceName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.notMatchedBySourceUpsertCondition == null ? "" : String.format("AND (%s)", this.notMatchedBySourceUpsertCondition);
        String format = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", objArr);
        LOG.info("Query used for not-matched-by-source-upsert:\n{}", format);
        Table sqlQuery = this.batchTEnv.sqlQuery(format);
        checkSchema("not-matched-by-source-upsert", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.UPDATE_AFTER, this.converters));
    }

    private Optional<DataStream<RowData>> getMatchedDeleteDataStream() {
        if (!this.matchedDelete) {
            return Optional.empty();
        }
        Object[] objArr = new Object[5];
        objArr[0] = String.join(FieldListaggAgg.DELIMITER, (List) this.targetFieldNames.stream().map(str -> {
            return targetTableName() + Path.CUR_DIR + str;
        }).collect(Collectors.toList()));
        objArr[1] = escapedTargetName();
        objArr[2] = escapedSourceName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.matchedDeleteCondition == null ? "" : "WHERE " + this.matchedDeleteCondition;
        String format = String.format("SELECT %s FROM %s INNER JOIN %s ON %s %s", objArr);
        LOG.info("Query used by matched-delete:\n{}", format);
        Table sqlQuery = this.batchTEnv.sqlQuery(format);
        checkSchema("matched-delete", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.DELETE, this.converters));
    }

    private Optional<DataStream<RowData>> getNotMatchedDeleteDataStream() {
        if (!this.notMatchedDelete) {
            return Optional.empty();
        }
        Object[] objArr = new Object[5];
        objArr[0] = String.join(FieldListaggAgg.DELIMITER, this.targetFieldNames);
        objArr[1] = escapedTargetName();
        objArr[2] = escapedSourceName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.notMatchedBySourceDeleteCondition == null ? "" : String.format("AND (%s)", this.notMatchedBySourceDeleteCondition);
        String format = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", objArr);
        LOG.info("Query used by not-matched-by-source-delete:\n{}", format);
        Table sqlQuery = this.batchTEnv.sqlQuery(format);
        checkSchema("not-matched-by-source-delete", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.DELETE, this.converters));
    }

    private Optional<DataStream<RowData>> getInsertDataStream() {
        if (!this.insert) {
            return Optional.empty();
        }
        Object[] objArr = new Object[5];
        objArr[0] = this.notMatchedInsertValues;
        objArr[1] = escapedSourceName();
        objArr[2] = escapedTargetName();
        objArr[3] = this.mergeCondition;
        objArr[4] = this.notMatchedInsertCondition == null ? "" : String.format("AND (%s)", this.notMatchedInsertCondition);
        String format = String.format("SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", objArr);
        LOG.info("Query used by not-matched-insert:\n{}", format);
        Table sqlQuery = this.batchTEnv.sqlQuery(format);
        checkSchema("not-matched-insert", sqlQuery);
        return Optional.of(toDataStream(sqlQuery, RowKind.INSERT, this.converters));
    }

    private void checkSchema(String str, Table table) {
        List<DataType> paimonTypes = toPaimonTypes(table.getResolvedSchema().getColumnDataTypes());
        List<DataType> fieldTypes = this.table.rowType().getFieldTypes();
        if (!compatibleCheck(paimonTypes, fieldTypes)) {
            throw new IllegalStateException(String.format("The schema of result in action '%s' is invalid.\nResult schema:   [%s]\nExpected schema: [%s]", str, paimonTypes.stream().map((v0) -> {
                return v0.asSQLString();
            }).collect(Collectors.joining(", ")), fieldTypes.stream().map((v0) -> {
                return v0.asSQLString();
            }).collect(Collectors.joining(", "))));
        }
    }

    private DataStream<RowData> toDataStream(Table table, RowKind rowKind, List<DataStructureConverter<Object, Object>> list) {
        return this.batchTEnv.toChangelogStream(table).map(row -> {
            int arity = row.getArity();
            GenericRowData genericRowData = new GenericRowData(rowKind, arity);
            for (int i = 0; i < arity; i++) {
                genericRowData.setField(i, ((DataStructureConverter) list.get(i)).toInternalOrNull(row.getField(i)));
            }
            return genericRowData;
        });
    }

    private String targetTableName() {
        return this.targetAlias == null ? this.identifier.getObjectName() : this.targetAlias;
    }

    private String escapedTargetName() {
        return String.format("`%s`.`%s`.`%s`", this.catalogName, this.identifier.getDatabaseName(), targetTableName());
    }

    private String escapedSourceName() {
        return (String) Arrays.stream(this.sourceTable.split("\\.")).map(str -> {
            return String.format("`%s`", str);
        }).collect(Collectors.joining(Path.CUR_DIR));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -415241085:
                if (implMethodName.equals("lambda$toDataStream$eb2d18fd$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/action/MergeIntoAction") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/types/RowKind;Ljava/util/List;Lorg/apache/flink/types/Row;)Lorg/apache/flink/table/data/RowData;")) {
                    RowKind rowKind = (RowKind) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    return row -> {
                        int arity = row.getArity();
                        GenericRowData genericRowData = new GenericRowData(rowKind, arity);
                        for (int i = 0; i < arity; i++) {
                            genericRowData.setField(i, ((DataStructureConverter) list.get(i)).toInternalOrNull(row.getField(i)));
                        }
                        return genericRowData;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
