package org.apache.seatunnel.connectors.cdc.debezium.row;

import com.google.common.base.Preconditions;
import io.debezium.data.Envelope;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.class */
public final class SeaTunnelRowDebeziumDeserializeSchema implements DebeziumDeserializationSchema<SeaTunnelRow> {
    private static final long serialVersionUID = 1;
    private final MetadataConverter[] metadataConverters;
    private final ZoneId serverTimeZone;
    private final DebeziumDeserializationConverterFactory userDefinedConverterFactory;
    private final SchemaChangeResolver schemaChangeResolver;
    private final DataTypeChangeEventHandler dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
    private SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
    private Map<String, SeaTunnelRowDebeziumDeserializationConverters> tableRowConverters;
    private static final Logger log = LoggerFactory.getLogger(SeaTunnelRowDebeziumDeserializeSchema.class);
    private static final String DEFAULT_TABLE_NAME_KEY = null;

    /* loaded from: input_file:org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema$Builder.class */
    public static class Builder {
        private SeaTunnelDataType<SeaTunnelRow> physicalRowType;
        private SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
        private MetadataConverter[] metadataConverters;
        private ZoneId serverTimeZone;
        private DebeziumDeserializationConverterFactory userDefinedConverterFactory;
        private SchemaChangeResolver schemaChangeResolver;

        public SeaTunnelRowDebeziumDeserializeSchema build() {
            return new SeaTunnelRowDebeziumDeserializeSchema(this.physicalRowType, this.metadataConverters, this.resultTypeInfo, this.serverTimeZone, this.userDefinedConverterFactory, this.schemaChangeResolver);
        }

        public Builder setPhysicalRowType(SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType) {
            this.physicalRowType = seaTunnelDataType;
            return this;
        }

        public Builder setResultTypeInfo(SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType) {
            this.resultTypeInfo = seaTunnelDataType;
            return this;
        }

        public Builder setMetadataConverters(MetadataConverter[] metadataConverterArr) {
            this.metadataConverters = metadataConverterArr;
            return this;
        }

        public Builder setServerTimeZone(ZoneId zoneId) {
            this.serverTimeZone = zoneId;
            return this;
        }

        public Builder setUserDefinedConverterFactory(DebeziumDeserializationConverterFactory debeziumDeserializationConverterFactory) {
            this.userDefinedConverterFactory = debeziumDeserializationConverterFactory;
            return this;
        }

        public Builder setSchemaChangeResolver(SchemaChangeResolver schemaChangeResolver) {
            this.schemaChangeResolver = schemaChangeResolver;
            return this;
        }

        private Builder() {
            this.metadataConverters = new MetadataConverter[0];
            this.serverTimeZone = ZoneId.systemDefault();
            this.userDefinedConverterFactory = DebeziumDeserializationConverterFactory.DEFAULT;
        }
    }

    SeaTunnelRowDebeziumDeserializeSchema(SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType, MetadataConverter[] metadataConverterArr, SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType2, ZoneId zoneId, DebeziumDeserializationConverterFactory debeziumDeserializationConverterFactory, SchemaChangeResolver schemaChangeResolver) {
        this.metadataConverters = metadataConverterArr;
        this.serverTimeZone = zoneId;
        this.userDefinedConverterFactory = debeziumDeserializationConverterFactory;
        this.resultTypeInfo = (SeaTunnelDataType) Preconditions.checkNotNull(seaTunnelDataType2);
        this.schemaChangeResolver = schemaChangeResolver;
        this.tableRowConverters = createTableRowConverters(seaTunnelDataType2, metadataConverterArr, zoneId, debeziumDeserializationConverterFactory);
    }

    @Override // org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema
    public void deserialize(SourceRecord sourceRecord, Collector<SeaTunnelRow> collector) throws Exception {
        if (WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(sourceRecord)) {
            collector.markSchemaChangeBeforeCheckpoint();
            return;
        }
        if (WatermarkEvent.isSchemaChangeAfterWatermarkEvent(sourceRecord)) {
            collector.markSchemaChangeAfterCheckpoint();
            return;
        }
        if (SourceRecordUtils.isSchemaChangeEvent(sourceRecord)) {
            deserializeSchemaChangeRecord(sourceRecord, collector);
        } else if (SourceRecordUtils.isDataChangeRecord(sourceRecord)) {
            deserializeDataChangeRecord(sourceRecord, collector);
        } else {
            log.debug("Unsupported record {}, just skip.", sourceRecord);
        }
    }

    private void deserializeSchemaChangeRecord(SourceRecord sourceRecord, Collector<SeaTunnelRow> collector) {
        SchemaChangeEvent resolve = this.schemaChangeResolver.resolve(sourceRecord, this.resultTypeInfo);
        if (resolve == null) {
            log.info("Unsupported resolve schemaChangeEvent {}, just skip.", sourceRecord);
            return;
        }
        if (this.resultTypeInfo instanceof MultipleRowType) {
            HashMap hashMap = new HashMap();
            Iterator it = this.resultTypeInfo.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (((String) entry.getKey()).equals(resolve.tablePath().toString())) {
                    log.debug("Table[{}] datatype change before: {}", entry.getKey(), entry.getValue());
                    SeaTunnelRowType apply = this.dataTypeChangeEventHandler.reset((SeaTunnelRowType) entry.getValue()).apply(resolve);
                    hashMap.put(entry.getKey(), apply);
                    log.debug("Table[{}] datatype change after: {}", entry.getKey(), apply);
                } else {
                    hashMap.put(entry.getKey(), entry.getValue());
                }
            }
            this.resultTypeInfo = new MultipleRowType(hashMap);
        } else {
            log.debug("Table datatype change before: {}", this.resultTypeInfo);
            this.resultTypeInfo = this.dataTypeChangeEventHandler.reset(this.resultTypeInfo).apply(resolve);
            log.debug("table datatype change after: {}", this.resultTypeInfo);
        }
        this.tableRowConverters = createTableRowConverters(this.resultTypeInfo, this.metadataConverters, this.serverTimeZone, this.userDefinedConverterFactory);
        collector.collect(resolve);
    }

    private void deserializeDataChangeRecord(SourceRecord sourceRecord, Collector<SeaTunnelRow> collector) throws Exception {
        SeaTunnelRowDebeziumDeserializationConverters seaTunnelRowDebeziumDeserializationConverters;
        Envelope.Operation operationFor = Envelope.operationFor(sourceRecord);
        Struct struct = (Struct) sourceRecord.value();
        Schema valueSchema = sourceRecord.valueSchema();
        String tablePath = SourceRecordUtils.getTablePath(sourceRecord).toString();
        if (this.resultTypeInfo instanceof MultipleRowType) {
            seaTunnelRowDebeziumDeserializationConverters = this.tableRowConverters.get(tablePath);
            if (seaTunnelRowDebeziumDeserializationConverters == null) {
                log.debug("Ignore newly added table {}", tablePath);
                return;
            }
        } else {
            seaTunnelRowDebeziumDeserializationConverters = this.tableRowConverters.get(DEFAULT_TABLE_NAME_KEY);
        }
        if (operationFor == Envelope.Operation.CREATE || operationFor == Envelope.Operation.READ) {
            SeaTunnelRow extractAfterRow = extractAfterRow(seaTunnelRowDebeziumDeserializationConverters, sourceRecord, struct, valueSchema);
            extractAfterRow.setRowKind(RowKind.INSERT);
            extractAfterRow.setTableId(tablePath);
            collector.collect(extractAfterRow);
            return;
        }
        if (operationFor == Envelope.Operation.DELETE) {
            SeaTunnelRow extractBeforeRow = extractBeforeRow(seaTunnelRowDebeziumDeserializationConverters, sourceRecord, struct, valueSchema);
            extractBeforeRow.setRowKind(RowKind.DELETE);
            extractBeforeRow.setTableId(tablePath);
            collector.collect(extractBeforeRow);
            return;
        }
        SeaTunnelRow extractBeforeRow2 = extractBeforeRow(seaTunnelRowDebeziumDeserializationConverters, sourceRecord, struct, valueSchema);
        extractBeforeRow2.setRowKind(RowKind.UPDATE_BEFORE);
        extractBeforeRow2.setTableId(tablePath);
        collector.collect(extractBeforeRow2);
        SeaTunnelRow extractAfterRow2 = extractAfterRow(seaTunnelRowDebeziumDeserializationConverters, sourceRecord, struct, valueSchema);
        extractAfterRow2.setRowKind(RowKind.UPDATE_AFTER);
        extractAfterRow2.setTableId(tablePath);
        collector.collect(extractAfterRow2);
    }

    private SeaTunnelRow extractAfterRow(SeaTunnelRowDebeziumDeserializationConverters seaTunnelRowDebeziumDeserializationConverters, SourceRecord sourceRecord, Struct struct, Schema schema) throws Exception {
        return seaTunnelRowDebeziumDeserializationConverters.convert(sourceRecord, struct.getStruct("after"), schema.field("after").schema());
    }

    private SeaTunnelRow extractBeforeRow(SeaTunnelRowDebeziumDeserializationConverters seaTunnelRowDebeziumDeserializationConverters, SourceRecord sourceRecord, Struct struct, Schema schema) throws Exception {
        return seaTunnelRowDebeziumDeserializationConverters.convert(sourceRecord, struct.getStruct("before"), schema.field("before").schema());
    }

    @Override // org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema
    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.resultTypeInfo;
    }

    @Override // org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema
    public SchemaChangeResolver getSchemaChangeResolver() {
        return this.schemaChangeResolver;
    }

    @Override // org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema
    public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType) {
        if (!seaTunnelDataType.getSqlType().equals(this.resultTypeInfo.getSqlType())) {
            throw new IllegalStateException(String.format("The produced type %s of the SeaTunnel deserialization schema doesn't match the type %s of the restored snapshot.", this.resultTypeInfo.getSqlType(), seaTunnelDataType.getSqlType()));
        }
        if (seaTunnelDataType instanceof MultipleRowType) {
            MultipleRowType multipleRowType = this.resultTypeInfo;
            HashMap hashMap = new HashMap();
            Iterator it = multipleRowType.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                hashMap.put(entry.getKey(), entry.getValue());
            }
            Iterator it2 = ((MultipleRowType) seaTunnelDataType).iterator();
            while (it2.hasNext()) {
                Map.Entry entry2 = (Map.Entry) it2.next();
                SeaTunnelRowType rowType = multipleRowType.getRowType((String) entry2.getKey());
                if (rowType == null) {
                    log.info("Ignore restore table[{}] datatype has been deleted.", entry2.getKey());
                } else {
                    log.info("Table[{}] datatype restore before: {}", entry2.getKey(), rowType);
                    hashMap.put(entry2.getKey(), entry2.getValue());
                    log.info("Table[{}] datatype restore after: {}", entry2.getKey(), entry2.getValue());
                }
            }
            this.resultTypeInfo = new MultipleRowType(hashMap);
        } else {
            log.info("Table datatype restore before: {}", this.resultTypeInfo);
            this.resultTypeInfo = seaTunnelDataType;
            log.info("Table datatype restore after: {}", seaTunnelDataType);
        }
        this.tableRowConverters = createTableRowConverters(this.resultTypeInfo, this.metadataConverters, this.serverTimeZone, this.userDefinedConverterFactory);
    }

    private static Map<String, SeaTunnelRowDebeziumDeserializationConverters> createTableRowConverters(SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType, MetadataConverter[] metadataConverterArr, ZoneId zoneId, DebeziumDeserializationConverterFactory debeziumDeserializationConverterFactory) {
        HashMap hashMap = new HashMap();
        if (!(seaTunnelDataType instanceof MultipleRowType)) {
            hashMap.put(DEFAULT_TABLE_NAME_KEY, new SeaTunnelRowDebeziumDeserializationConverters((SeaTunnelRowType) seaTunnelDataType, metadataConverterArr, zoneId, debeziumDeserializationConverterFactory));
            return hashMap;
        }
        Iterator it = ((MultipleRowType) seaTunnelDataType).iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            hashMap.put(entry.getKey(), new SeaTunnelRowDebeziumDeserializationConverters((SeaTunnelRowType) entry.getValue(), metadataConverterArr, zoneId, debeziumDeserializationConverterFactory));
        }
        return hashMap;
    }

    public static Builder builder() {
        return new Builder();
    }
}
