package org.apache.seatunnel.connectors.cdc.debezium.row;

import io.debezium.data.Envelope;
import io.debezium.relational.TableId;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.event.EventType;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler;
import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.AbstractDebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.class */
public final class SeaTunnelRowDebeziumDeserializeSchema extends AbstractDebeziumDeserializationSchema<SeaTunnelRow> {
    private static final long serialVersionUID = 1;
    private final MetadataConverter[] metadataConverters;
    private final ZoneId serverTimeZone;
    private final DebeziumDeserializationConverterFactory userDefinedConverterFactory;
    private final SchemaChangeResolver schemaChangeResolver;
    private final TableSchemaChangeEventHandler tableSchemaChangeHandler;
    private List<CatalogTable> tables;
    private Map<String, SeaTunnelRowDebeziumDeserializationConverters> tableRowConverters;
    private static final Logger log = LoggerFactory.getLogger(SeaTunnelRowDebeziumDeserializeSchema.class);
    private static final String DEFAULT_TABLE_NAME_KEY = null;

    /* loaded from: input_file:org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema$Builder.class */
    public static class Builder {
        private List<CatalogTable> tables;
        private MetadataConverter[] metadataConverters;
        private ZoneId serverTimeZone;
        private DebeziumDeserializationConverterFactory userDefinedConverterFactory;
        private Map<TableId, Struct> tableIdTableChangeMap;
        private SchemaChangeResolver schemaChangeResolver;

        public SeaTunnelRowDebeziumDeserializeSchema build() {
            return new SeaTunnelRowDebeziumDeserializeSchema(this.metadataConverters, this.tables, this.serverTimeZone, this.userDefinedConverterFactory, this.schemaChangeResolver, this.tableIdTableChangeMap);
        }

        public Builder setTables(List<CatalogTable> list) {
            this.tables = list;
            return this;
        }

        public Builder setMetadataConverters(MetadataConverter[] metadataConverterArr) {
            this.metadataConverters = metadataConverterArr;
            return this;
        }

        public Builder setServerTimeZone(ZoneId zoneId) {
            this.serverTimeZone = zoneId;
            return this;
        }

        public Builder setUserDefinedConverterFactory(DebeziumDeserializationConverterFactory debeziumDeserializationConverterFactory) {
            this.userDefinedConverterFactory = debeziumDeserializationConverterFactory;
            return this;
        }

        public Builder setTableIdTableChangeMap(Map<TableId, Struct> map) {
            this.tableIdTableChangeMap = map;
            return this;
        }

        public Builder setSchemaChangeResolver(SchemaChangeResolver schemaChangeResolver) {
            this.schemaChangeResolver = schemaChangeResolver;
            return this;
        }

        private Builder() {
            this.metadataConverters = new MetadataConverter[0];
            this.serverTimeZone = ZoneId.systemDefault();
            this.userDefinedConverterFactory = DebeziumDeserializationConverterFactory.DEFAULT;
            this.tableIdTableChangeMap = new HashMap();
        }
    }

    SeaTunnelRowDebeziumDeserializeSchema(MetadataConverter[] metadataConverterArr, List<CatalogTable> list, ZoneId zoneId, DebeziumDeserializationConverterFactory debeziumDeserializationConverterFactory, SchemaChangeResolver schemaChangeResolver, Map<TableId, Struct> map) {
        super(map);
        this.metadataConverters = metadataConverterArr;
        this.serverTimeZone = zoneId;
        this.userDefinedConverterFactory = debeziumDeserializationConverterFactory;
        this.tables = (List) Preconditions.checkNotNull(list);
        this.schemaChangeResolver = schemaChangeResolver;
        this.tableSchemaChangeHandler = new TableSchemaChangeEventDispatcher();
        this.tableRowConverters = createTableRowConverters(list, metadataConverterArr, zoneId, debeziumDeserializationConverterFactory);
    }

    @Override // org.apache.seatunnel.connectors.cdc.debezium.AbstractDebeziumDeserializationSchema, org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema
    public void deserialize(SourceRecord sourceRecord, Collector<SeaTunnelRow> collector) throws Exception {
        super.deserialize(sourceRecord, collector);
        if (WatermarkEvent.isSchemaChangeBeforeWatermarkEvent(sourceRecord)) {
            collector.markSchemaChangeBeforeCheckpoint();
            return;
        }
        if (WatermarkEvent.isSchemaChangeAfterWatermarkEvent(sourceRecord)) {
            collector.markSchemaChangeAfterCheckpoint();
            return;
        }
        if (SourceRecordUtils.isSchemaChangeEvent(sourceRecord)) {
            deserializeSchemaChangeRecord(sourceRecord, collector);
        } else if (SourceRecordUtils.isDataChangeRecord(sourceRecord)) {
            deserializeDataChangeRecord(sourceRecord, collector);
        } else {
            log.debug("Unsupported record {}, just skip.", sourceRecord);
        }
    }

    private void deserializeSchemaChangeRecord(SourceRecord sourceRecord, Collector<SeaTunnelRow> collector) {
        try {
            SchemaChangeEvent resolve = this.schemaChangeResolver != null ? this.schemaChangeResolver.resolve(sourceRecord, null) : null;
            if (resolve == null) {
                log.warn("Unsupported resolve schemaChangeEvent {}, just skip.", sourceRecord);
                return;
            }
            boolean z = false;
            int i = 0;
            while (true) {
                if (i >= this.tables.size()) {
                    break;
                }
                CatalogTable catalogTable = this.tables.get(i);
                if (resolve.tablePath().equals(catalogTable.getTablePath())) {
                    z = true;
                    log.debug("Table[{}] change before: {}", resolve.tablePath(), catalogTable.getTableSchema());
                    CatalogTable catalogTable2 = null;
                    if (EventType.SCHEMA_CHANGE_UPDATE_COLUMNS.equals(resolve.getEventType())) {
                        for (AlterTableColumnEvent alterTableColumnEvent : ((AlterTableColumnsEvent) resolve).getEvents()) {
                            catalogTable2 = CatalogTable.of(catalogTable.getTableId(), this.tableSchemaChangeHandler.reset(catalogTable.getTableSchema()).apply(alterTableColumnEvent), catalogTable.getOptions(), catalogTable.getPartitionKeys(), catalogTable.getComment());
                            alterTableColumnEvent.setChangeAfter(catalogTable2);
                            catalogTable = catalogTable2;
                        }
                    } else {
                        catalogTable2 = CatalogTable.of(catalogTable.getTableId(), this.tableSchemaChangeHandler.reset(catalogTable.getTableSchema()).apply(resolve), catalogTable.getOptions(), catalogTable.getPartitionKeys(), catalogTable.getComment());
                    }
                    this.tables.set(i, catalogTable2);
                    resolve.setChangeAfter(catalogTable2);
                    log.debug("Table[{}] change after: {}", resolve.tablePath(), catalogTable2.getTableSchema());
                } else {
                    i++;
                }
            }
            if (!z) {
                log.error("Not found table {}, skip schema change event {}", resolve.tablePath());
            }
            this.tableRowConverters = createTableRowConverters(this.tables, this.metadataConverters, this.serverTimeZone, this.userDefinedConverterFactory);
            collector.collect(resolve);
        } catch (Exception e) {
            log.warn("Failed to resolve schemaChangeEvent, just skip.", e);
        }
    }

    private void deserializeDataChangeRecord(SourceRecord sourceRecord, Collector<SeaTunnelRow> collector) throws Exception {
        SeaTunnelRowDebeziumDeserializationConverters seaTunnelRowDebeziumDeserializationConverters;
        Envelope.Operation operationFor = Envelope.operationFor(sourceRecord);
        Struct struct = (Struct) sourceRecord.value();
        Schema valueSchema = sourceRecord.valueSchema();
        String tablePath = SourceRecordUtils.getTablePath(sourceRecord).toString();
        if (this.tables.size() > 1) {
            seaTunnelRowDebeziumDeserializationConverters = this.tableRowConverters.get(tablePath);
            if (seaTunnelRowDebeziumDeserializationConverters == null) {
                log.debug("Ignore newly added table {}", tablePath);
                return;
            }
        } else {
            seaTunnelRowDebeziumDeserializationConverters = this.tableRowConverters.get(DEFAULT_TABLE_NAME_KEY);
        }
        Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(sourceRecord);
        Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(sourceRecord);
        long j = -1;
        if (fetchTimestamp != null && messageTimestamp != null) {
            j = fetchTimestamp.longValue() - messageTimestamp.longValue();
        }
        if (operationFor == Envelope.Operation.CREATE || operationFor == Envelope.Operation.READ) {
            SeaTunnelRow extractAfterRow = extractAfterRow(seaTunnelRowDebeziumDeserializationConverters, sourceRecord, struct, valueSchema);
            extractAfterRow.setRowKind(RowKind.INSERT);
            extractAfterRow.setTableId(tablePath);
            MetadataUtil.setDelay(extractAfterRow, Long.valueOf(j));
            MetadataUtil.setEventTime(extractAfterRow, fetchTimestamp);
            collector.collect(extractAfterRow);
            return;
        }
        if (operationFor == Envelope.Operation.DELETE) {
            SeaTunnelRow extractBeforeRow = extractBeforeRow(seaTunnelRowDebeziumDeserializationConverters, sourceRecord, struct, valueSchema);
            extractBeforeRow.setRowKind(RowKind.DELETE);
            extractBeforeRow.setTableId(tablePath);
            MetadataUtil.setDelay(extractBeforeRow, Long.valueOf(j));
            MetadataUtil.setEventTime(extractBeforeRow, fetchTimestamp);
            collector.collect(extractBeforeRow);
            return;
        }
        if (operationFor != Envelope.Operation.UPDATE) {
            log.warn("Received {} operation, skip", operationFor);
            return;
        }
        SeaTunnelRow extractBeforeRow2 = extractBeforeRow(seaTunnelRowDebeziumDeserializationConverters, sourceRecord, struct, valueSchema);
        extractBeforeRow2.setRowKind(RowKind.UPDATE_BEFORE);
        extractBeforeRow2.setTableId(tablePath);
        MetadataUtil.setDelay(extractBeforeRow2, Long.valueOf(j));
        MetadataUtil.setEventTime(extractBeforeRow2, fetchTimestamp);
        collector.collect(extractBeforeRow2);
        SeaTunnelRow extractAfterRow2 = extractAfterRow(seaTunnelRowDebeziumDeserializationConverters, sourceRecord, struct, valueSchema);
        extractAfterRow2.setRowKind(RowKind.UPDATE_AFTER);
        extractAfterRow2.setTableId(tablePath);
        MetadataUtil.setDelay(extractAfterRow2, Long.valueOf(j));
        MetadataUtil.setEventTime(extractAfterRow2, fetchTimestamp);
        collector.collect(extractAfterRow2);
    }

    private SeaTunnelRow extractAfterRow(SeaTunnelRowDebeziumDeserializationConverters seaTunnelRowDebeziumDeserializationConverters, SourceRecord sourceRecord, Struct struct, Schema schema) throws Exception {
        return seaTunnelRowDebeziumDeserializationConverters.convert(sourceRecord, struct.getStruct(Envelope.FieldName.AFTER), schema.field(Envelope.FieldName.AFTER).schema());
    }

    private SeaTunnelRow extractBeforeRow(SeaTunnelRowDebeziumDeserializationConverters seaTunnelRowDebeziumDeserializationConverters, SourceRecord sourceRecord, Struct struct, Schema schema) throws Exception {
        return seaTunnelRowDebeziumDeserializationConverters.convert(sourceRecord, struct.getStruct(Envelope.FieldName.BEFORE), schema.field(Envelope.FieldName.BEFORE).schema());
    }

    @Override // org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema
    public List<CatalogTable> getProducedType() {
        return this.tables;
    }

    @Override // org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema
    public SchemaChangeResolver getSchemaChangeResolver() {
        return this.schemaChangeResolver;
    }

    @Override // org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema
    public void restoreCheckpointProducedType(List<CatalogTable> list) {
        if (this.schemaChangeResolver == null) {
            return;
        }
        Map map = (Map) this.tables.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTablePath();
        }, catalogTable -> {
            return catalogTable;
        }));
        Map map2 = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTablePath();
        }, catalogTable2 -> {
            return catalogTable2;
        }));
        for (TablePath tablePath : map2.keySet()) {
            CatalogTable catalogTable3 = (CatalogTable) map.get(tablePath);
            CatalogTable catalogTable4 = (CatalogTable) map2.get(tablePath);
            if (catalogTable3 == null) {
                log.info("Ignore restore table[{}] has been deleted.", tablePath);
            } else {
                log.info("Table[{}] restore before: {}", tablePath, catalogTable3.getSeaTunnelRowType());
                map.put(tablePath, catalogTable4);
                log.info("Table[{}] restore after: {}", tablePath, catalogTable4.getSeaTunnelRowType());
            }
        }
        this.tables = new ArrayList(map.values());
        this.tableRowConverters = createTableRowConverters(this.tables, this.metadataConverters, this.serverTimeZone, this.userDefinedConverterFactory);
    }

    private static Map<String, SeaTunnelRowDebeziumDeserializationConverters> createTableRowConverters(List<CatalogTable> list, MetadataConverter[] metadataConverterArr, ZoneId zoneId, DebeziumDeserializationConverterFactory debeziumDeserializationConverterFactory) {
        HashMap hashMap = new HashMap();
        if (list.size() <= 1) {
            hashMap.put(DEFAULT_TABLE_NAME_KEY, new SeaTunnelRowDebeziumDeserializationConverters(list.get(0).getSeaTunnelRowType(), metadataConverterArr, zoneId, debeziumDeserializationConverterFactory));
            return hashMap;
        }
        for (CatalogTable catalogTable : list) {
            hashMap.put(catalogTable.getTablePath().toString(), new SeaTunnelRowDebeziumDeserializationConverters(catalogTable.getSeaTunnelRowType(), metadataConverterArr, zoneId, debeziumDeserializationConverterFactory));
        }
        return hashMap;
    }

    public static Builder builder() {
        return new Builder();
    }
}
