package org.apache.paimon.flink.action.cdc.format;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/format/RecordParser.class */
public abstract class RecordParser implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(RecordParser.class);
    protected static final String FIELD_TABLE = "table";
    protected static final String FIELD_DATABASE = "database";
    private final boolean caseSensitive;
    protected final TypeMapping typeMapping;
    protected final List<ComputedColumn> computedColumns;
    protected JsonNode root;

    public RecordParser(boolean z, TypeMapping typeMapping, List<ComputedColumn> list) {
        this.caseSensitive = z;
        this.typeMapping = typeMapping;
        this.computedColumns = list;
    }

    @Nullable
    public Schema buildSchema(CdcSourceRecord cdcSourceRecord) {
        try {
            setRoot(cdcSourceRecord);
            if (isDDL()) {
                return null;
            }
            Optional<RichCdcMultiplexRecord> findFirst = extractRecords().stream().findFirst();
            if (!findFirst.isPresent()) {
                return null;
            }
            Schema.Builder newBuilder = Schema.newBuilder();
            findFirst.get().fields().forEach(dataField -> {
                newBuilder.column(dataField.name(), dataField.type(), dataField.description());
            });
            newBuilder.primaryKey(extractPrimaryKeys());
            return newBuilder.build();
        } catch (Exception e) {
            logInvalidSourceRecord(cdcSourceRecord);
            throw e;
        }
    }

    protected abstract List<RichCdcMultiplexRecord> extractRecords();

    protected abstract String primaryField();

    protected abstract String dataField();

    protected boolean isDDL() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fillDefaultTypes(JsonNode jsonNode, RowType.Builder builder) {
        jsonNode.fieldNames().forEachRemaining(str -> {
            builder.field(str, DataTypes.STRING());
        });
    }

    public void flatMap(CdcSourceRecord cdcSourceRecord, Collector<RichCdcMultiplexRecord> collector) {
        try {
            setRoot(cdcSourceRecord);
            List<RichCdcMultiplexRecord> extractRecords = extractRecords();
            collector.getClass();
            extractRecords.forEach((v1) -> {
                r1.collect(v1);
            });
        } catch (Exception e) {
            logInvalidSourceRecord(cdcSourceRecord);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> extractRowData(JsonNode jsonNode, RowType.Builder builder) {
        fillDefaultTypes(jsonNode, builder);
        Map<String, String> map = (Map) ((Map) JsonSerdeUtil.convertValue(jsonNode, new TypeReference<Map<String, Object>>() { // from class: org.apache.paimon.flink.action.cdc.format.RecordParser.1
        })).entrySet().stream().filter(entry -> {
            return Objects.nonNull(entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry2 -> {
            if (!Objects.nonNull(entry2.getValue()) || TypeUtils.isBasicType(entry2.getValue())) {
                return Objects.toString(entry2.getValue());
            }
            try {
                return JsonSerdeUtil.writeValueAsString(entry2.getValue());
            } catch (JsonProcessingException e) {
                LOG.error("Failed to deserialize record.", e);
                return Objects.toString(entry2.getValue());
            }
        }));
        evalComputedColumns(map, builder);
        return map;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void evalComputedColumns(Map<String, String> map, RowType.Builder builder) {
        this.computedColumns.forEach(computedColumn -> {
            map.put(computedColumn.columnName(), computedColumn.eval((String) map.get(computedColumn.fieldReference())));
            builder.field(computedColumn.columnName(), computedColumn.columnType());
        });
    }

    private List<String> extractPrimaryKeys() {
        ArrayNode nodeAs = JsonSerdeUtil.getNodeAs(this.root, primaryField(), ArrayNode.class);
        return nodeAs == null ? Collections.emptyList() : (List) StreamSupport.stream(nodeAs.spliterator(), false).map((v0) -> {
            return v0.asText();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processRecord(JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord> list) {
        RowType.Builder builder = RowType.builder();
        list.add(createRecord(rowKind, extractRowData(jsonNode, builder), builder.build().getFields()));
    }

    private RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> map, List<DataField> list) {
        String databaseName = getDatabaseName();
        String tableName = getTableName();
        return new RichCdcMultiplexRecord(databaseName, tableName, CdcActionCommonUtils.fieldNameCaseConvert(list, this.caseSensitive, tableName), CdcActionCommonUtils.listCaseConvert(extractPrimaryKeys(), this.caseSensitive), new CdcRecord(rowKind, CdcActionCommonUtils.mapKeyCaseConvert(map, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(map))));
    }

    protected void setRoot(CdcSourceRecord cdcSourceRecord) {
        this.root = (JsonNode) cdcSourceRecord.getValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonNode mergeOldRecord(JsonNode jsonNode, JsonNode jsonNode2) {
        JsonNode deepCopy = jsonNode.deepCopy();
        jsonNode2.fieldNames().forEachRemaining(str -> {
            ((ObjectNode) deepCopy).set(str, jsonNode2.get(str));
        });
        return deepCopy;
    }

    @Nullable
    protected String getTableName() {
        JsonNode jsonNode = this.root.get(FIELD_TABLE);
        if (JsonSerdeUtil.isNull(jsonNode)) {
            return null;
        }
        return jsonNode.asText();
    }

    @Nullable
    protected String getDatabaseName() {
        JsonNode jsonNode = this.root.get(FIELD_DATABASE);
        if (JsonSerdeUtil.isNull(jsonNode)) {
            return null;
        }
        return jsonNode.asText();
    }

    private void logInvalidSourceRecord(CdcSourceRecord cdcSourceRecord) {
        LOG.error("Invalid source record:\n{}", cdcSourceRecord.toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkNotNull(JsonNode jsonNode, String str) {
        if (JsonSerdeUtil.isNull(jsonNode)) {
            throw new RuntimeException(String.format("Invalid %s format: missing '%s' field.", format(), str));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkNotNull(JsonNode jsonNode, String str, String str2, String str3) {
        if (JsonSerdeUtil.isNull(jsonNode)) {
            throw new RuntimeException(String.format("Invalid %s format: missing '%s' field when '%s' is '%s'.", format(), str, str2, str3));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonNode getAndCheck(String str) {
        JsonNode jsonNode = this.root.get(str);
        checkNotNull(jsonNode, str);
        return jsonNode;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JsonNode getAndCheck(String str, String str2, String str3) {
        JsonNode jsonNode = this.root.get(str);
        checkNotNull(jsonNode, str, str2, str3);
        return jsonNode;
    }

    protected abstract String format();

    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
        flatMap((CdcSourceRecord) obj, (Collector<RichCdcMultiplexRecord>) collector);
    }
}
