package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;

import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.class */
public class MySqlIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig> implements SupportParallelism {
    static final String IDENTIFIER = "MySQL-CDC";

    public MySqlIncrementalSource(ReadonlyConfig readonlyConfig, SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType, List<CatalogTable> list) {
        super(readonlyConfig, seaTunnelDataType, list);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public Option<StartupMode> getStartupModeOption() {
        return MySqlSourceOptions.STARTUP_MODE;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public Option<StopMode> getStopModeOption() {
        return MySqlSourceOptions.STOP_MODE;
    }

    public String getPluginName() {
        return IDENTIFIER;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(ReadonlyConfig readonlyConfig) {
        MySqlSourceConfigFactory mySqlSourceConfigFactory = new MySqlSourceConfigFactory();
        mySqlSourceConfigFactory.serverId((String) readonlyConfig.get(JdbcSourceOptions.SERVER_ID));
        mySqlSourceConfigFactory.fromReadonlyConfig(this.readonlyConfig);
        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo((String) readonlyConfig.get(JdbcCatalogOptions.BASE_URL));
        mySqlSourceConfigFactory.originUrl(urlInfo.getOrigin());
        mySqlSourceConfigFactory.hostname(urlInfo.getHost());
        mySqlSourceConfigFactory.port(urlInfo.getPort().intValue());
        mySqlSourceConfigFactory.startupOptions(this.startupConfig);
        mySqlSourceConfigFactory.stopOptions(this.stopConfig);
        return mySqlSourceConfigFactory;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig readonlyConfig) {
        if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(readonlyConfig.get(JdbcSourceOptions.FORMAT))) {
            return new DebeziumJsonDeserializeSchema((Map) readonlyConfig.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
        }
        SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType = this.dataType;
        return SeaTunnelRowDebeziumDeserializeSchema.builder().setPhysicalRowType(seaTunnelDataType).setResultTypeInfo(seaTunnelDataType).setServerTimeZone(ZoneId.of((String) readonlyConfig.get(JdbcSourceOptions.SERVER_TIME_ZONE))).setSchemaChangeResolver(new MySqlSchemaChangeResolver(createSourceConfigFactory(readonlyConfig))).build();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public DataSourceDialect<JdbcSourceConfig> createDataSourceDialect(ReadonlyConfig readonlyConfig) {
        return new MySqlDialect((MySqlSourceConfigFactory) this.configFactory, this.catalogTables);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public OffsetFactory createOffsetFactory(ReadonlyConfig readonlyConfig) {
        return new BinlogOffsetFactory((MySqlSourceConfigFactory) this.configFactory, (MySqlDialect) this.dataSourceDialect);
    }
}
