package org.apache.paimon.flink.action.cdc.postgres;

import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.paimon.shade.org.apache.commons.compress.harmony.pack200.PackingOptions;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.class */
public class PostgresRecordParser implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresRecordParser.class);
    private final ObjectMapper objectMapper;
    private final ZoneId serverTimeZone;
    private final boolean caseSensitive;
    private final List<ComputedColumn> computedColumns;
    private final TypeMapping typeMapping;
    private DebeziumEvent root;
    private String currentTable;
    private String databaseName;
    private final CdcMetadataConverter[] metadataConverters;

    public PostgresRecordParser(Configuration configuration, boolean z, TypeMapping typeMapping, CdcMetadataConverter[] cdcMetadataConverterArr) {
        this(configuration, z, Collections.emptyList(), typeMapping, cdcMetadataConverterArr);
    }

    public PostgresRecordParser(Configuration configuration, boolean z, List<ComputedColumn> list, TypeMapping typeMapping, CdcMetadataConverter[] cdcMetadataConverterArr) {
        this.objectMapper = new ObjectMapper();
        this.caseSensitive = z;
        this.computedColumns = list;
        this.typeMapping = typeMapping;
        this.metadataConverters = cdcMetadataConverterArr;
        this.objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        String str = (String) configuration.get(PostgresSourceOptions.SERVER_TIME_ZONE);
        this.serverTimeZone = str == null ? ZoneId.systemDefault() : ZoneId.of(str);
    }

    public void flatMap(CdcSourceRecord cdcSourceRecord, Collector<RichCdcMultiplexRecord> collector) throws Exception {
        this.root = (DebeziumEvent) this.objectMapper.readValue((String) cdcSourceRecord.getValue(), DebeziumEvent.class);
        this.currentTable = this.root.payload().source().get(ActionFactory.TABLE).asText();
        this.databaseName = this.root.payload().source().get("db").asText();
        List<RichCdcMultiplexRecord> extractRecords = extractRecords();
        collector.getClass();
        extractRecords.forEach((v1) -> {
            r1.collect(v1);
        });
    }

    private List<DataField> extractFields(DebeziumEvent.Field field) {
        Map<String, DebeziumEvent.Field> afterFields = field.afterFields();
        Preconditions.checkArgument(!afterFields.isEmpty(), "PostgresRecordParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created");
        RowType.Builder builder = RowType.builder();
        HashSet hashSet = new HashSet();
        Function<String, String> columnDuplicateErrMsg = CdcActionCommonUtils.columnDuplicateErrMsg(this.currentTable);
        afterFields.forEach((str, field2) -> {
            builder.field(CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck(str, hashSet, this.caseSensitive, columnDuplicateErrMsg), extractFieldType(field2).copy(this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_NULLABLE) || field2.optional().booleanValue()));
        });
        return builder.build().getFields();
    }

    private DataType extractFieldType(DebeziumEvent.Field field) {
        String type = field.type();
        boolean z = -1;
        switch (type.hashCode()) {
            case -1325958191:
                if (type.equals("double")) {
                    z = 10;
                    break;
                }
                break;
            case -891985903:
                if (type.equals("string")) {
                    z = 12;
                    break;
                }
                break;
            case -891974699:
                if (type.equals("struct")) {
                    z = 2;
                    break;
                }
                break;
            case -766443077:
                if (type.equals("float32")) {
                    z = 8;
                    break;
                }
                break;
            case -766442982:
                if (type.equals("float64")) {
                    z = 9;
                    break;
                }
                break;
            case 107868:
                if (type.equals("map")) {
                    z = true;
                    break;
                }
                break;
            case 3237417:
                if (type.equals("int8")) {
                    z = 3;
                    break;
                }
                break;
            case 64711720:
                if (type.equals("boolean")) {
                    z = 11;
                    break;
                }
                break;
            case 93090393:
                if (type.equals("array")) {
                    z = false;
                    break;
                }
                break;
            case 94224491:
                if (type.equals("bytes")) {
                    z = 13;
                    break;
                }
                break;
            case 97526364:
                if (type.equals("float")) {
                    z = 7;
                    break;
                }
                break;
            case 100359764:
                if (type.equals("int16")) {
                    z = 4;
                    break;
                }
                break;
            case 100359822:
                if (type.equals("int32")) {
                    z = 5;
                    break;
                }
                break;
            case 100359917:
                if (type.equals("int64")) {
                    z = 6;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return DataTypes.ARRAY(DataTypes.STRING());
            case true:
            case true:
                return DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING());
            case true:
                return DataTypes.TINYINT();
            case true:
                return DataTypes.SMALLINT();
            case true:
                return "io.debezium.time.Date".equals(field.name()) ? DataTypes.DATE() : DataTypes.INT();
            case true:
                return "io.debezium.time.MicroTimestamp".equals(field.name()) ? DataTypes.TIMESTAMP(6) : "io.debezium.time.MicroTime".equals(field.name()) ? DataTypes.TIME(6) : DataTypes.BIGINT();
            case true:
            case true:
                return DataTypes.FLOAT();
            case true:
            case true:
                return DataTypes.DOUBLE();
            case true:
                return DataTypes.BOOLEAN();
            case true:
                return DataTypes.STRING();
            case true:
                if (DebeziumSchemaUtils.decimalLogicalName().equals(field.name())) {
                    return DataTypes.DECIMAL(field.parameters().get("connect.decimal.precision").asInt(), field.parameters().get("scale").asInt());
                }
                if (!"io.debezium.data.Bits".equals(field.name())) {
                    return DataTypes.BYTES();
                }
                String asText = field.parameters().get("length").asText();
                if (StringUtils.isBlank(asText)) {
                    return DataTypes.BOOLEAN();
                }
                Integer valueOf = Integer.valueOf(asText);
                if (valueOf.intValue() == 1) {
                    return DataTypes.BOOLEAN();
                }
                return DataTypes.BINARY(valueOf.intValue() == Integer.MAX_VALUE ? valueOf.intValue() / 8 : (valueOf.intValue() + 7) / 8);
            default:
                return DataTypes.STRING();
        }
    }

    private List<RichCdcMultiplexRecord> extractRecords() {
        ArrayList arrayList = new ArrayList();
        Map<String, String> extractRow = extractRow(this.root.payload().before());
        if (!extractRow.isEmpty()) {
            arrayList.add(createRecord(RowKind.DELETE, CdcActionCommonUtils.mapKeyCaseConvert(extractRow, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(extractRow))));
        }
        Map<String, String> extractRow2 = extractRow(this.root.payload().after());
        if (!extractRow2.isEmpty()) {
            Map mapKeyCaseConvert = CdcActionCommonUtils.mapKeyCaseConvert(extractRow2, this.caseSensitive, CdcActionCommonUtils.recordKeyDuplicateErrMsg(extractRow2));
            arrayList.add(new RichCdcMultiplexRecord(this.databaseName, this.currentTable, extractFields(this.root.schema()), Collections.emptyList(), new CdcRecord(RowKind.INSERT, mapKeyCaseConvert)));
        }
        return arrayList;
    }

    /* JADX WARN: Type inference failed for: r0v102, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r0v117, types: [java.time.LocalDateTime] */
    private Map<String, String> extractRow(JsonNode jsonNode) {
        if (JsonSerdeUtil.isNull(jsonNode)) {
            return new HashMap();
        }
        Map<String, DebeziumEvent.Field> beforeAndAfterFields = ((DebeziumEvent.Field) Preconditions.checkNotNull(this.root.schema(), "PostgresRecordParser only supports debezium JSON with schema. Please make sure that `includeSchema` is true in the JsonDebeziumDeserializationSchema you created")).beforeAndAfterFields();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<String, DebeziumEvent.Field> entry : beforeAndAfterFields.entrySet()) {
            String key = entry.getKey();
            String type = entry.getValue().type();
            JsonNode jsonNode2 = jsonNode.get(key);
            if (!JsonSerdeUtil.isNull(jsonNode2)) {
                String name = entry.getValue().name();
                String asText = jsonNode2.asText();
                String str = asText;
                if ("io.debezium.data.Bits".equals(name)) {
                    byte[] decode = Base64.getDecoder().decode(asText);
                    byte[] bArr = new byte[decode.length];
                    for (int i = 0; i < decode.length; i++) {
                        bArr[i] = decode[(decode.length - 1) - i];
                    }
                    str = this.typeMapping.containsMode(TypeMapping.TypeMappingMode.TO_STRING) ? StringUtils.bytesToBinaryString(bArr) : Base64.getEncoder().encodeToString(bArr);
                } else if ("bytes".equals(type) && name == null) {
                    str = new String(Base64.getDecoder().decode(asText));
                } else if ("bytes".equals(type) && DebeziumSchemaUtils.decimalLogicalName().equals(name)) {
                    try {
                        new BigDecimal(asText);
                    } catch (NumberFormatException e) {
                        throw new IllegalArgumentException("Invalid big decimal value " + asText + ". Make sure that in the `customConverterConfigs` of the JsonDebeziumDeserializationSchema you created, set 'decimal.format' to 'numeric'", e);
                    }
                } else if ("io.debezium.time.Date".equals(name)) {
                    str = DateTimeUtils.toLocalDate(Integer.parseInt(asText)).toString();
                } else if ("io.debezium.time.Timestamp".equals(name)) {
                    str = DateTimeUtils.formatLocalDateTime(DateTimeUtils.toLocalDateTime(Long.parseLong(asText), ZoneOffset.UTC), 3);
                } else if ("io.debezium.time.MicroTimestamp".equals(name)) {
                    long parseLong = Long.parseLong(asText);
                    str = DateTimeUtils.formatLocalDateTime(Instant.ofEpochSecond(parseLong / PackingOptions.SEGMENT_LIMIT, (parseLong % PackingOptions.SEGMENT_LIMIT) * 1000).atZone(ZoneOffset.UTC).toLocalDateTime(), 6);
                } else if ("io.debezium.time.ZonedTimestamp".equals(name)) {
                    str = DateTimeUtils.formatLocalDateTime(Instant.parse(asText).atZone(this.serverTimeZone).toLocalDateTime(), 6);
                } else if ("io.debezium.time.MicroTime".equals(name)) {
                    long parseLong2 = Long.parseLong(asText);
                    str = Instant.ofEpochSecond(parseLong2 / PackingOptions.SEGMENT_LIMIT, (parseLong2 % PackingOptions.SEGMENT_LIMIT) * 1000).atZone(ZoneOffset.UTC).toLocalTime().toString();
                } else if ("array".equals(type)) {
                    ArrayNode arrayNode = (ArrayNode) jsonNode2;
                    ArrayList arrayList = new ArrayList();
                    arrayNode.elements().forEachRemaining(jsonNode3 -> {
                        arrayList.add(jsonNode3.asText());
                    });
                    try {
                        str = this.objectMapper.writer().writeValueAsString(arrayList);
                    } catch (JsonProcessingException e2) {
                        LOG.error("Failed to convert array to JSON.", e2);
                    }
                }
                linkedHashMap.put(key, str);
            }
        }
        for (ComputedColumn computedColumn : this.computedColumns) {
            linkedHashMap.put(computedColumn.columnName(), computedColumn.eval((String) linkedHashMap.get(computedColumn.fieldReference())));
        }
        for (CdcMetadataConverter cdcMetadataConverter : this.metadataConverters) {
            linkedHashMap.put(cdcMetadataConverter.columnName(), cdcMetadataConverter.read(this.root.payload().source()));
        }
        return linkedHashMap;
    }

    protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> map) {
        return new RichCdcMultiplexRecord(this.databaseName, this.currentTable, Collections.emptyList(), Collections.emptyList(), new CdcRecord(rowKind, map));
    }

    public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
        flatMap((CdcSourceRecord) obj, (Collector<RichCdcMultiplexRecord>) collector);
    }
}
