package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb;

import com.google.auto.service.AutoService;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfigProvider;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffsetFactory;

@AutoService({SeaTunnelSource.class})
/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.class */
public class MongodbIncrementalSource<T> extends IncrementalSource<T, MongodbSourceConfig> implements SupportParallelism {
    static final String IDENTIFIER = "MongoDB-CDC";

    public MongodbIncrementalSource(ReadonlyConfig readonlyConfig, SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType) {
        super(readonlyConfig, seaTunnelDataType);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public Option<StartupMode> getStartupModeOption() {
        return MongodbSourceOptions.STARTUP_MODE;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public Option<StopMode> getStopModeOption() {
        return MongodbSourceOptions.STOP_MODE;
    }

    public String getPluginName() {
        return IDENTIFIER;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public SourceConfig.Factory<MongodbSourceConfig> createSourceConfigFactory(@Nonnull ReadonlyConfig readonlyConfig) {
        MongodbSourceConfigProvider.Builder validate = MongodbSourceConfigProvider.newBuilder().hosts((String) readonlyConfig.get(MongodbSourceOptions.HOSTS)).validate();
        Optional ofNullable = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.DATABASE));
        validate.getClass();
        ofNullable.ifPresent(validate::databaseList);
        Optional ofNullable2 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.COLLECTION));
        validate.getClass();
        ofNullable2.ifPresent(validate::collectionList);
        Optional ofNullable3 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.USERNAME));
        validate.getClass();
        ofNullable3.ifPresent(validate::username);
        Optional ofNullable4 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.PASSWORD));
        validate.getClass();
        ofNullable4.ifPresent(validate::password);
        Optional ofNullable5 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.CONNECTION_OPTIONS));
        validate.getClass();
        ofNullable5.ifPresent(validate::connectionOptions);
        Optional ofNullable6 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.BATCH_SIZE));
        validate.getClass();
        ofNullable6.ifPresent((v1) -> {
            r1.batchSize(v1);
        });
        Optional ofNullable7 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.POLL_MAX_BATCH_SIZE));
        validate.getClass();
        ofNullable7.ifPresent((v1) -> {
            r1.pollMaxBatchSize(v1);
        });
        Optional ofNullable8 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS));
        validate.getClass();
        ofNullable8.ifPresent((v1) -> {
            r1.pollAwaitTimeMillis(v1);
        });
        Optional ofNullable9 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS));
        validate.getClass();
        ofNullable9.ifPresent((v1) -> {
            r1.heartbeatIntervalMillis(v1);
        });
        Optional ofNullable10 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS));
        validate.getClass();
        ofNullable10.ifPresent((v1) -> {
            r1.splitMetaGroupSize(v1);
        });
        Optional ofNullable11 = Optional.ofNullable(readonlyConfig.get(MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB));
        validate.getClass();
        ofNullable11.ifPresent((v1) -> {
            r1.splitSizeMB(v1);
        });
        Optional ofNullable12 = Optional.ofNullable(this.startupConfig);
        validate.getClass();
        ofNullable12.ifPresent(validate::startupOptions);
        Optional ofNullable13 = Optional.ofNullable(this.stopConfig);
        validate.getClass();
        ofNullable13.ifPresent(validate::stopOptions);
        return validate;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig readonlyConfig) {
        if (this.dataType == null) {
            return new DebeziumJsonDeserializeSchema((Map) readonlyConfig.get(MongodbSourceOptions.DEBEZIUM_PROPERTIES));
        }
        SeaTunnelDataType<SeaTunnelRow> seaTunnelDataType = this.dataType;
        return new MongoDBConnectorDeserializationSchema(seaTunnelDataType, seaTunnelDataType);
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public DataSourceDialect<MongodbSourceConfig> createDataSourceDialect(ReadonlyConfig readonlyConfig) {
        return new MongodbDialect();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource
    public OffsetFactory createOffsetFactory(ReadonlyConfig readonlyConfig) {
        return new ChangeStreamOffsetFactory();
    }

    public MongodbIncrementalSource() {
    }
}
