package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

@AutoService({SeaTunnelSource.class})
/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.class */
public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>, SupportParallelism {
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private SeaTunnelRowType typeInfo;
    private JobContext jobContext;
    private final ConsumerMetadata metadata = new ConsumerMetadata();
    private long discoveryIntervalMillis = ((Long) Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue()).longValue();
    private MessageFormatErrorHandleWay messageFormatErrorHandleWay = MessageFormatErrorHandleWay.FAIL;

    public Boundedness getBoundedness() {
        return JobMode.BATCH.equals(this.jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
    }

    public String getPluginName() {
        return Config.CONNECTOR_IDENTITY;
    }

    public void prepare(org.apache.seatunnel.shade.com.typesafe.config.Config config) throws PrepareFailException {
        CheckResult checkAllExists = CheckConfigUtil.checkAllExists(config, new String[]{Config.TOPIC.key(), Config.BOOTSTRAP_SERVERS.key()});
        if (!checkAllExists.isSuccess()) {
            throw new KafkaConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, checkAllExists.getMsg()));
        }
        this.metadata.setTopic(config.getString(Config.TOPIC.key()));
        if (config.hasPath(Config.PATTERN.key())) {
            this.metadata.setPattern(config.getBoolean(Config.PATTERN.key()));
        } else {
            this.metadata.setPattern(((Boolean) Config.PATTERN.defaultValue()).booleanValue());
        }
        this.metadata.setBootstrapServers(config.getString(Config.BOOTSTRAP_SERVERS.key()));
        this.metadata.setProperties(new Properties());
        if (config.hasPath(Config.CONSUMER_GROUP.key())) {
            this.metadata.setConsumerGroup(config.getString(Config.CONSUMER_GROUP.key()));
        } else {
            this.metadata.setConsumerGroup((String) Config.CONSUMER_GROUP.defaultValue());
        }
        if (config.hasPath(Config.COMMIT_ON_CHECKPOINT.key())) {
            this.metadata.setCommitOnCheckpoint(config.getBoolean(Config.COMMIT_ON_CHECKPOINT.key()));
        } else {
            this.metadata.setCommitOnCheckpoint(((Boolean) Config.COMMIT_ON_CHECKPOINT.defaultValue()).booleanValue());
        }
        if (config.hasPath(Config.START_MODE.key())) {
            StartMode valueOf = StartMode.valueOf(config.getString(Config.START_MODE.key()).toUpperCase());
            this.metadata.setStartMode(valueOf);
            switch (valueOf) {
                case TIMESTAMP:
                    long j = config.getLong(Config.START_MODE_TIMESTAMP.key());
                    long currentTimeMillis = System.currentTimeMillis();
                    if (j >= 0 && j <= currentTimeMillis) {
                        this.metadata.setStartOffsetsTimestamp(Long.valueOf(j));
                        break;
                    } else {
                        throw new IllegalArgumentException("start_mode.timestamp The value is smaller than 0 or smaller than the current time");
                    }
                    break;
                case SPECIFIC_OFFSETS:
                    String render = config.getConfig(Config.START_MODE_OFFSETS.key()).root().render(ConfigRenderOptions.concise());
                    if (render != null) {
                        HashMap hashMap = new HashMap();
                        ObjectNode parseObject = JsonUtils.parseObject(render);
                        parseObject.fieldNames().forEachRemaining(str -> {
                            int lastIndexOf = str.lastIndexOf("-");
                            hashMap.put(new TopicPartition(str.substring(0, lastIndexOf), Integer.valueOf(str.substring(lastIndexOf + 1)).intValue()), Long.valueOf(parseObject.get(str).asLong()));
                        });
                        this.metadata.setSpecificStartOffsets(hashMap);
                        break;
                    } else {
                        throw new IllegalArgumentException("start mode is " + StartMode.SPECIFIC_OFFSETS + "but no specific offsets were specified.");
                    }
            }
        }
        if (config.hasPath(Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
            this.discoveryIntervalMillis = config.getLong(Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
        }
        if (CheckConfigUtil.isValidParam(config, Config.KAFKA_CONFIG.key())) {
            config.getObject(Config.KAFKA_CONFIG.key()).forEach((str2, configValue) -> {
                this.metadata.getProperties().put(str2, configValue.unwrapped());
            });
        }
        if (config.hasPath(Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION.key())) {
            MessageFormatErrorHandleWay messageFormatErrorHandleWay = (MessageFormatErrorHandleWay) ReadonlyConfig.fromConfig(config).get(Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION);
            switch (messageFormatErrorHandleWay) {
                case FAIL:
                case SKIP:
                    this.messageFormatErrorHandleWay = messageFormatErrorHandleWay;
                    break;
            }
        }
        setDeserialization(config);
    }

    /* renamed from: getProducedType, reason: merged with bridge method [inline-methods] */
    public SeaTunnelRowType m663getProducedType() {
        return this.typeInfo;
    }

    public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Context context) throws Exception {
        return new KafkaSourceReader(this.metadata, this.deserializationSchema, context, this.messageFormatErrorHandleWay);
    }

    public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> context) throws Exception {
        return new KafkaSourceSplitEnumerator(this.metadata, context, this.discoveryIntervalMillis);
    }

    public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> context, KafkaSourceState kafkaSourceState) throws Exception {
        return new KafkaSourceSplitEnumerator(this.metadata, context, kafkaSourceState, this.discoveryIntervalMillis);
    }

    public void setJobContext(JobContext jobContext) {
        this.jobContext = jobContext;
    }

    private void setDeserialization(org.apache.seatunnel.shade.com.typesafe.config.Config config) {
        if (!config.hasPath(Config.SCHEMA.key())) {
            this.typeInfo = CatalogTableUtil.buildSimpleTextSchema();
            this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(TextFormatConstant.PLACEHOLDER).build();
            return;
        }
        config.getConfig(Config.SCHEMA.key());
        this.typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
        MessageFormat messageFormat = (MessageFormat) ReadonlyConfig.fromConfig(config).get(Config.FORMAT);
        switch (messageFormat) {
            case JSON:
                this.deserializationSchema = new JsonDeserializationSchema(false, false, this.typeInfo);
                return;
            case TEXT:
                String str = Config.DEFAULT_FIELD_DELIMITER;
                if (config.hasPath(Config.FIELD_DELIMITER.key())) {
                    str = config.getString(Config.FIELD_DELIMITER.key());
                }
                this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(str).build();
                return;
            case CANAL_JSON:
                this.deserializationSchema = CanalJsonDeserializationSchema.builder(this.typeInfo).setIgnoreParseErrors(true).build();
                return;
            default:
                throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode) CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + messageFormat);
        }
    }

    public /* bridge */ /* synthetic */ SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context context, Serializable serializable) throws Exception {
        return restoreEnumerator((SourceSplitEnumerator.Context<KafkaSourceSplit>) context, (KafkaSourceState) serializable);
    }
}
