package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.commons.cli.HelpFormatter;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

@AutoService({SeaTunnelSource.class})
/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.class */
public class RocketMqSource implements SeaTunnelSource<SeaTunnelRow, RocketMqSourceSplit, RocketMqSourceState>, SupportParallelism {
    private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
    private JobContext jobContext;
    private SeaTunnelRowType typeInfo;
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private final ConsumerMetadata metadata = new ConsumerMetadata();
    private long discoveryIntervalMillis = ((Long) ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue()).longValue();

    public String getPluginName() {
        return "Rocketmq";
    }

    public Boundedness getBoundedness() {
        return JobMode.BATCH.equals(this.jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED;
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult checkAllExists = CheckConfigUtil.checkAllExists(config, new String[]{ConsumerConfig.TOPICS.key(), org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.NAME_SRV_ADDR.key()});
        if (!checkAllExists.isSuccess()) {
            throw new RocketMqConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, checkAllExists.getMsg()));
        }
        this.metadata.setTopics(Arrays.asList(config.getString(ConsumerConfig.TOPICS.key()).split(",")));
        RocketMqBaseConfiguration.Builder namesrvAddr = RocketMqBaseConfiguration.newBuilder().consumer().namesrvAddr(config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.NAME_SRV_ADDR.key()));
        boolean booleanValue = ((Boolean) org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED.defaultValue()).booleanValue();
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED.key())) {
            booleanValue = config.getBoolean(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED.key());
            if (booleanValue && (!config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY.key()) || !config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY.key()))) {
                throw new RocketMqConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, "When ACL_ENABLED true , ACCESS_KEY and SECRET_KEY must be configured");
            }
            if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY.key())) {
                namesrvAddr.accessKey(config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY.key()));
            }
            if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY.key())) {
                namesrvAddr.secretKey(config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY.key()));
            }
        }
        namesrvAddr.aclEnable(booleanValue);
        if (config.hasPath(ConsumerConfig.CONSUMER_GROUP.key())) {
            namesrvAddr.groupId(config.getString(ConsumerConfig.CONSUMER_GROUP.key()));
        } else {
            namesrvAddr.groupId(DEFAULT_CONSUMER_GROUP);
        }
        if (config.hasPath(ConsumerConfig.BATCH_SIZE.key())) {
            namesrvAddr.batchSize(config.getInt(ConsumerConfig.BATCH_SIZE.key()));
        } else {
            namesrvAddr.batchSize(((Integer) ConsumerConfig.BATCH_SIZE.defaultValue()).intValue());
        }
        if (config.hasPath(ConsumerConfig.POLL_TIMEOUT_MILLIS.key())) {
            namesrvAddr.pollTimeoutMillis(config.getInt(ConsumerConfig.POLL_TIMEOUT_MILLIS.key()));
        } else {
            namesrvAddr.pollTimeoutMillis(((Long) ConsumerConfig.POLL_TIMEOUT_MILLIS.defaultValue()).longValue());
        }
        this.metadata.setBaseConfig(namesrvAddr.build());
        if (config.hasPath(ConsumerConfig.COMMIT_ON_CHECKPOINT.key())) {
            this.metadata.setEnabledCommitCheckpoint(config.getBoolean(ConsumerConfig.COMMIT_ON_CHECKPOINT.key()));
        } else {
            this.metadata.setEnabledCommitCheckpoint(((Boolean) ConsumerConfig.COMMIT_ON_CHECKPOINT.defaultValue()).booleanValue());
        }
        StartMode startMode = (StartMode) ConsumerConfig.START_MODE.defaultValue();
        if (config.hasPath(ConsumerConfig.START_MODE.key())) {
            startMode = StartMode.valueOf(config.getString(ConsumerConfig.START_MODE.key()).toUpperCase());
            switch (startMode) {
                case CONSUME_FROM_TIMESTAMP:
                    long j = config.getLong(ConsumerConfig.START_MODE_TIMESTAMP.key());
                    long currentTimeMillis = System.currentTimeMillis();
                    if (j >= 0 && j <= currentTimeMillis) {
                        this.metadata.setStartOffsetsTimestamp(Long.valueOf(j));
                        break;
                    } else {
                        throw new IllegalArgumentException("The offsets timestamp value is smaller than 0 or smaller than the current time");
                    }
                case CONSUME_FROM_SPECIFIC_OFFSETS:
                    String render = config.getConfig(ConsumerConfig.START_MODE_OFFSETS.key()).root().render(ConfigRenderOptions.concise());
                    if (render != null) {
                        HashMap hashMap = new HashMap();
                        ObjectNode parseObject = JsonUtils.parseObject(render);
                        parseObject.fieldNames().forEachRemaining(str -> {
                            int lastIndexOf = str.lastIndexOf(HelpFormatter.DEFAULT_OPT_PREFIX);
                            hashMap.put(new MessageQueue(str.substring(0, lastIndexOf), null, Integer.valueOf(str.substring(lastIndexOf + 1)).intValue()), Long.valueOf(parseObject.get(str).asLong()));
                        });
                        this.metadata.setSpecificStartOffsets(hashMap);
                        break;
                    } else {
                        throw new IllegalArgumentException("start mode is " + StartMode.CONSUME_FROM_SPECIFIC_OFFSETS + "but no specific offsets were specified.");
                    }
            }
        }
        this.metadata.setStartMode(startMode);
        if (config.hasPath(ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
            this.discoveryIntervalMillis = config.getLong(ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
        }
        setDeserialization(config);
    }

    public void setJobContext(JobContext jobContext) {
        this.jobContext = jobContext;
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.typeInfo;
    }

    public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(SourceReader.Context context) throws Exception {
        return new RocketMqSourceReader(this.metadata, this.deserializationSchema, context);
    }

    public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> createEnumerator(SourceSplitEnumerator.Context<RocketMqSourceSplit> context) throws Exception {
        return new RocketMqSourceSplitEnumerator(this.metadata, context, this.discoveryIntervalMillis);
    }

    public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> restoreEnumerator(SourceSplitEnumerator.Context<RocketMqSourceSplit> context, RocketMqSourceState rocketMqSourceState) throws Exception {
        return new RocketMqSourceSplitEnumerator(this.metadata, context, this.discoveryIntervalMillis);
    }

    private void setDeserialization(Config config) {
        if (!config.hasPath(ConsumerConfig.SCHEMA.key())) {
            this.typeInfo = CatalogTableUtil.buildSimpleTextSchema();
            this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(String.valueOf((char) 2)).build();
            return;
        }
        this.typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
        SchemaFormat schemaFormat = SchemaFormat.JSON;
        if (config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT.key())) {
            schemaFormat = SchemaFormat.find(config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT.key()));
        }
        switch (schemaFormat) {
            case JSON:
                this.deserializationSchema = new JsonDeserializationSchema(false, false, this.typeInfo);
                return;
            case TEXT:
                this.deserializationSchema = TextDeserializationSchema.builder().seaTunnelRowType(this.typeInfo).delimiter(config.hasPath(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER.key()) ? config.getString(org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER.key()) : ",").build();
                return;
            default:
                throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode) CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + schemaFormat);
        }
    }

    public /* bridge */ /* synthetic */ SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context context, Serializable serializable) throws Exception {
        return restoreEnumerator((SourceSplitEnumerator.Context<RocketMqSourceSplit>) context, (RocketMqSourceState) serializable);
    }
}
