/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;

import com.google.auto.service.AutoService;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import java.time.ZoneId;
import java.util.Map;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.SqlServerDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.SqlServerSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;

@AutoService(value={SeaTunnelSource.class})
public class SqlServerIncrementalSource<T>
extends IncrementalSource<T, JdbcSourceConfig>
implements SupportParallelism {
    static final String IDENTIFIER = "SqlServer-CDC";

    public SqlServerIncrementalSource(ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
        super(options, dataType);
    }

    public String getPluginName() {
        return IDENTIFIER;
    }

    @Override
    public Option<StartupMode> getStartupModeOption() {
        return SqlServerSourceOptions.STARTUP_MODE;
    }

    @Override
    public Option<StopMode> getStopModeOption() {
        return SqlServerSourceOptions.STOP_MODE;
    }

    @Override
    public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(ReadonlyConfig config) {
        SqlServerSourceConfigFactory configFactory = new SqlServerSourceConfigFactory();
        configFactory.fromReadonlyConfig(this.readonlyConfig);
        configFactory.startupOptions(this.startupConfig);
        configFactory.stopOptions(this.stopConfig);
        JdbcUrlUtil.UrlInfo urlInfo = SqlServerURLParser.parse((String)config.get(JdbcCatalogOptions.BASE_URL));
        configFactory.originUrl(urlInfo.getOrigin());
        configFactory.hostname(urlInfo.getHost());
        configFactory.port(urlInfo.getPort());
        return configFactory;
    }

    @Override
    public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig config) {
        SeaTunnelDataType physicalRowType;
        if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(config.get(JdbcSourceOptions.FORMAT))) {
            return new DebeziumJsonDeserializeSchema((Map)config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
        }
        if (this.dataType == null) {
            Table table;
            SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig)this.configFactory.create(0);
            TableId tableId = this.dataSourceDialect.discoverDataCollections(sqlServerSourceConfig).get(0);
            try (SqlServerConnection sqlServerConnection = SqlServerConnectionUtils.createSqlServerConnection(sqlServerSourceConfig.getDbzConfiguration());){
                table = ((SqlServerDialect)this.dataSourceDialect).queryTableSchema(sqlServerConnection, tableId).getTable();
            }
            catch (Exception e) {
                throw new SeaTunnelException((Throwable)e);
            }
            physicalRowType = SqlServerTypeUtils.convertFromTable(table);
        } else {
            physicalRowType = this.dataType;
        }
        String zoneId = (String)config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
        return SeaTunnelRowDebeziumDeserializeSchema.builder().setPhysicalRowType((SeaTunnelDataType<SeaTunnelRow>)physicalRowType).setResultTypeInfo((SeaTunnelDataType<SeaTunnelRow>)physicalRowType).setServerTimeZone(ZoneId.of(zoneId)).build();
    }

    @Override
    public DataSourceDialect<JdbcSourceConfig> createDataSourceDialect(ReadonlyConfig config) {
        return new SqlServerDialect((SqlServerSourceConfigFactory)this.configFactory);
    }

    @Override
    public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
        return new LsnOffsetFactory((SqlServerSourceConfigFactory)this.configFactory, (SqlServerDialect)this.dataSourceDialect);
    }

    public SqlServerIncrementalSource() {
    }
}

