package org.apache.paimon.flink.action.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.class */
public class MySqlRecordParser implements FlatMapFunction<String, RichCdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordParser.class);
    private final ZoneId serverTimeZone;
    private final boolean caseSensitive;
    private final List<ComputedColumn> computedColumns;
    private final TypeMapping typeMapping;
    private DebeziumEvent root;
    private String currentTable;
    private String databaseName;
    private final CdcMetadataConverter[] metadataConverters;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Set<String> nonPkTables = new HashSet();

    public MySqlRecordParser(Configuration configuration, boolean z, List<ComputedColumn> list, TypeMapping typeMapping, CdcMetadataConverter[] cdcMetadataConverterArr) {
        this.caseSensitive = z;
        this.computedColumns = list;
        this.typeMapping = typeMapping;
        this.metadataConverters = cdcMetadataConverterArr;
        this.objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        String str = (String) configuration.get(MySqlSourceOptions.SERVER_TIME_ZONE);
        this.serverTimeZone = str == null ? ZoneId.systemDefault() : ZoneId.of(str);
    }

    public void flatMap(String str, Collector<RichCdcMultiplexRecord> collector) throws Exception {
        this.root = (DebeziumEvent) this.objectMapper.readValue(str, DebeziumEvent.class);
        this.currentTable = this.root.payload().source().get("table").asText();
        this.databaseName = this.root.payload().source().get("db").asText();
        if (this.root.payload().isSchemaChange()) {
            List<RichCdcMultiplexRecord> extractSchemaChange = extractSchemaChange();
            collector.getClass();
            extractSchemaChange.forEach((v1) -> {
                r1.collect(v1);
            });
        } else {
            List<RichCdcMultiplexRecord> extractRecords = extractRecords();
            collector.getClass();
            extractRecords.forEach((v1) -> {
                r1.collect(v1);
            });
        }
    }

    private List<RichCdcMultiplexRecord> extractSchemaChange() {
        DebeziumEvent.Payload payload = this.root.payload();
        if (!payload.hasHistoryRecord()) {
            return Collections.emptyList();
        }
        TableChanges.TableChange tableChange = null;
        try {
            Iterator<TableChanges.TableChange> tableChanges = payload.getTableChanges();
            long j = 0;
            while (tableChanges.hasNext()) {
                tableChange = tableChanges.next();
                j++;
            }
            if (j != 1) {
                LOG.error("Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" + payload.historyRecord());
                return Collections.emptyList();
            }
            if (TableChanges.TableChangeType.CREATE == tableChange.getType() && tableChange.getTable().primaryKeyColumnNames().isEmpty()) {
                LOG.error("Didn't find primary keys from MySQL DDL for table '{}'. This table won't be synchronized.", this.currentTable);
                this.nonPkTables.add(this.currentTable);
                return Collections.emptyList();
            }
            Table table = tableChange.getTable();
            return Collections.singletonList(new RichCdcMultiplexRecord(this.databaseName, this.currentTable, extractFieldTypes(table), CdcActionCommonUtils.listCaseConvert(table.primaryKeyColumnNames(), this.caseSensitive), CdcRecord.emptyRecord()));
        } catch (Exception e) {
            LOG.error("Failed to parse history record for schema changes", e);
            return Collections.emptyList();
        }
    }

    private LinkedHashMap<String, DataType> extractFieldTypes(Table table) {
        List<Column> columns = table.columns();
        LinkedHashMap<String, DataType> linkedHashMap = new LinkedHashMap<>(columns.size());
        HashSet hashSet = new HashSet();
        Function<String, String> columnDuplicateErrMsg = CdcActionCommonUtils.columnDuplicateErrMsg(table.id().toString());
        for (Column column : columns) {
            linkedHashMap.put(CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck(column.name(), hashSet, this.caseSensitive, columnDuplicateErrMsg), MySqlTypeUtils.toDataType(column.typeExpression(), Integer.valueOf(column.length()), (Integer) column.scale().orElse(null), this.typeMapping).copy(this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_NULLABLE) || column.isOptional()));
        }
        return linkedHashMap;
    }

    private List<RichCdcMultiplexRecord> extractRecords() {
        if (this.nonPkTables.contains(this.currentTable)) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Map<String, String> extractRow = extractRow(this.root.payload().before());
        if (!extractRow.isEmpty()) {
            arrayList.add(createRecord(RowKind.DELETE, CdcActionCommonUtils.mapKeyCaseConvert(extractRow, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(extractRow))));
        }
        Map<String, String> extractRow2 = extractRow(this.root.payload().after());
        if (!extractRow2.isEmpty()) {
            arrayList.add(createRecord(RowKind.INSERT, CdcActionCommonUtils.mapKeyCaseConvert(extractRow2, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(extractRow2))));
        }
        return arrayList;
    }

    /* JADX WARN: Type inference failed for: r0v100, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r0v115, types: [java.time.LocalDateTime] */
    private Map<String, String> extractRow(JsonNode jsonNode) {
        if (JsonSerdeUtil.isNull(jsonNode)) {
            return new HashMap();
        }
        Map<String, DebeziumEvent.Field> beforeAndAfterFields = ((DebeziumEvent.Field) Preconditions.checkNotNull(this.root.schema(), "MySqlRecordParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created")).beforeAndAfterFields();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<String, DebeziumEvent.Field> entry : beforeAndAfterFields.entrySet()) {
            String key = entry.getKey();
            String type = entry.getValue().type();
            JsonNode jsonNode2 = jsonNode.get(key);
            if (!JsonSerdeUtil.isNull(jsonNode2)) {
                String name = entry.getValue().name();
                String asText = jsonNode2.asText();
                String str = asText;
                if ("io.debezium.data.Bits".equals(name)) {
                    byte[] decode = Base64.getDecoder().decode(asText);
                    byte[] bArr = new byte[decode.length];
                    for (int i = 0; i < decode.length; i++) {
                        bArr[i] = decode[(decode.length - 1) - i];
                    }
                    str = this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_STRING) ? StringUtils.bytesToBinaryString(bArr) : Base64.getEncoder().encodeToString(bArr);
                } else if ("bytes".equals(type) && name == null) {
                    str = new String(Base64.getDecoder().decode(asText));
                } else if ("bytes".equals(type) && "org.apache.flink.kafka.shaded.org.apache.kafka.connect.data.Decimal".equals(name)) {
                    try {
                        new BigDecimal(asText);
                    } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid big decimal value " + asText + ". Make sure that in the `customConverterConfigs` of the JsonDebeziumDeserializationSchema you created, set 'decimal.format' to 'numeric'", e);
                    }
                } else if ("io.debezium.time.Date".equals(name)) {
                    str = DateTimeUtils.toLocalDate(Integer.parseInt(asText)).toString();
                } else if ("io.debezium.time.Timestamp".equals(name)) {
                    str = DateTimeUtils.formatLocalDateTime(DateTimeUtils.toLocalDateTime(Long.parseLong(asText), ZoneOffset.UTC), 3);
                } else if ("io.debezium.time.MicroTimestamp".equals(name)) {
                    long parseLong = Long.parseLong(asText);
                    str = DateTimeUtils.formatLocalDateTime((LocalDateTime) Instant.ofEpochSecond(parseLong / 1000000, (parseLong % 1000000) * 1000).atZone(ZoneOffset.UTC).toLocalDateTime(), 6);
                } else if ("io.debezium.time.ZonedTimestamp".equals(name)) {
                    str = DateTimeUtils.formatLocalDateTime((LocalDateTime) Instant.parse(asText).atZone(this.serverTimeZone).toLocalDateTime(), 6);
                } else if ("io.debezium.time.MicroTime".equals(name)) {
                    long parseLong2 = Long.parseLong(asText);
                    str = Instant.ofEpochSecond(parseLong2 / 1000000, (parseLong2 % 1000000) * 1000).atZone(ZoneOffset.UTC).toLocalTime().toString();
                } else if ("io.debezium.data.geometry.Point".equals(name) || "io.debezium.data.geometry.Geometry".equals(name)) {
                    try {
                        str = MySqlTypeUtils.convertWkbArray(jsonNode2.get("wkb").binaryValue());
                    } catch (Exception e2) {
                        throw new IllegalArgumentException(String.format("Failed to convert %s to geometry JSON.", jsonNode2), e2);
                    }
                }
                linkedHashMap.put(key, str);
            }
        }
        for (ComputedColumn computedColumn : this.computedColumns) {
            linkedHashMap.put(computedColumn.columnName(), computedColumn.eval((String) linkedHashMap.get(computedColumn.fieldReference())));
        }
        for (CdcMetadataConverter cdcMetadataConverter : this.metadataConverters) {
            linkedHashMap.put(cdcMetadataConverter.columnName(), cdcMetadataConverter.read(this.root.payload().source()));
        }
        return linkedHashMap;
    }

    protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> map) {
        return new RichCdcMultiplexRecord(this.databaseName, this.currentTable, new LinkedHashMap(0), Collections.emptyList(), new CdcRecord(rowKind, map));
    }

    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
        flatMap((String) obj, (Collector<RichCdcMultiplexRecord>) collector);
    }
}
