package org.apache.seatunnel.connectors.seatunnel.amazonsqs.source;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsConfig;
import org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.AmazonSqsSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.amazonsqs.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.amazonsqs.exception.AmazonSqsConnectorException;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({SeaTunnelSource.class})
/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/amazonsqs/source/AmazonSqsSource.class */
public class AmazonSqsSource extends AbstractSingleSplitSource<SeaTunnelRow> implements SupportColumnProjection {
    private static final Logger log = LoggerFactory.getLogger(AmazonSqsSource.class);
    private AmazonSqsSourceOptions amazonSqsSourceOptions;
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private SeaTunnelRowType typeInfo;

    public String getPluginName() {
        return "AmazonSqs";
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult checkAllExists = CheckConfigUtil.checkAllExists(config, new String[]{AmazonSqsConfig.URL.key(), AmazonSqsConfig.REGION.key(), TableSchemaOptions.SCHEMA.key()});
        if (!checkAllExists.isSuccess()) {
            throw new AmazonSqsConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, checkAllExists.getMsg()));
        }
        this.amazonSqsSourceOptions = new AmazonSqsSourceOptions(config);
        this.typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
        setDeserialization(config);
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.typeInfo;
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource
    public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext singleSplitReaderContext) throws Exception {
        return new AmazonSqsSourceReader(singleSplitReaderContext, this.amazonSqsSourceOptions, this.deserializationSchema, this.typeInfo);
    }

    private void setDeserialization(Config config) {
        if (!config.hasPath(TableSchemaOptions.SCHEMA.key())) {
            this.typeInfo = CatalogTableUtil.buildSimpleTextSchema();
            this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(TextFormatConstant.PLACEHOLDER).build();
            return;
        }
        this.typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
        MessageFormat messageFormat = (MessageFormat) ReadonlyConfig.fromConfig(config).get(AmazonSqsConfig.FORMAT);
        switch (messageFormat) {
            case JSON:
                this.deserializationSchema = new JsonDeserializationSchema(false, false, this.typeInfo);
                return;
            case TEXT:
                String str = AmazonSqsConfig.DEFAULT_FIELD_DELIMITER;
                if (config.hasPath(AmazonSqsConfig.FIELD_DELIMITER.key())) {
                    str = config.getString(AmazonSqsConfig.FIELD_DELIMITER.key());
                }
                this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(str).build();
                return;
            case CANAL_JSON:
                this.deserializationSchema = CanalJsonDeserializationSchema.builder(this.typeInfo).setIgnoreParseErrors(true).build();
                return;
            case DEBEZIUM_JSON:
                boolean booleanValue = ((Boolean) AmazonSqsConfig.DEBEZIUM_RECORD_INCLUDE_SCHEMA.defaultValue()).booleanValue();
                if (config.hasPath(AmazonSqsConfig.DEBEZIUM_RECORD_INCLUDE_SCHEMA.key())) {
                    booleanValue = config.getBoolean(AmazonSqsConfig.DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
                }
                this.deserializationSchema = new DebeziumJsonDeserializationSchema(this.typeInfo, true, booleanValue);
                return;
            default:
                throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode) CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + messageFormat);
        }
    }
}
