package org.apache.seatunnel.connectors.seatunnel.pulsar.source;

import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.util.Arrays;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.common.PropertiesUtil;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicListDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

@AutoService({SeaTunnelSource.class})
/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.class */
public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit, PulsarSplitEnumeratorState> {
    private DeserializationSchema<T> deserialization;
    private PulsarAdminConfig adminConfig;
    private PulsarClientConfig clientConfig;
    private PulsarConsumerConfig consumerConfig;
    private PulsarDiscoverer partitionDiscoverer;
    private long partitionDiscoveryIntervalMs;
    private StartCursor startCursor;
    private StopCursor stopCursor;
    protected int pollTimeout;
    protected long pollInterval;
    protected int batchSize;

    public String getPluginName() {
        return PulsarConfigUtil.IDENTIFIER;
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult checkAllExists = CheckConfigUtil.checkAllExists(config, new String[]{SourceProperties.SUBSCRIPTION_NAME.key(), SourceProperties.CLIENT_SERVICE_URL.key(), SourceProperties.ADMIN_SERVICE_URL.key()});
        if (!checkAllExists.isSuccess()) {
            throw new PulsarConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SOURCE, checkAllExists.getMsg()));
        }
        PulsarAdminConfig.Builder adminUrl = PulsarAdminConfig.builder().adminUrl(config.getString(SourceProperties.ADMIN_SERVICE_URL.key()));
        String key = SourceProperties.AUTH_PLUGIN_CLASS.key();
        config.getClass();
        Function function = config::getString;
        adminUrl.getClass();
        PropertiesUtil.setOption(config, key, function, adminUrl::authPluginClassName);
        String key2 = SourceProperties.AUTH_PARAMS.key();
        config.getClass();
        Function function2 = config::getString;
        adminUrl.getClass();
        PropertiesUtil.setOption(config, key2, function2, adminUrl::authParams);
        this.adminConfig = adminUrl.build();
        PulsarClientConfig.Builder serviceUrl = PulsarClientConfig.builder().serviceUrl(config.getString(SourceProperties.CLIENT_SERVICE_URL.key()));
        String key3 = SourceProperties.AUTH_PLUGIN_CLASS.key();
        config.getClass();
        Function function3 = config::getString;
        serviceUrl.getClass();
        PropertiesUtil.setOption(config, key3, function3, serviceUrl::authPluginClassName);
        String key4 = SourceProperties.AUTH_PARAMS.key();
        config.getClass();
        Function function4 = config::getString;
        serviceUrl.getClass();
        PropertiesUtil.setOption(config, key4, function4, serviceUrl::authParams);
        this.clientConfig = serviceUrl.build();
        this.consumerConfig = PulsarConsumerConfig.builder().subscriptionName(config.getString(SourceProperties.SUBSCRIPTION_NAME.key())).build();
        String key5 = SourceProperties.TOPIC_DISCOVERY_INTERVAL.key();
        Object defaultValue = SourceProperties.TOPIC_DISCOVERY_INTERVAL.defaultValue();
        config.getClass();
        PropertiesUtil.setOption(config, key5, defaultValue, config::getLong, l -> {
            this.partitionDiscoveryIntervalMs = l.longValue();
        });
        String key6 = SourceProperties.POLL_TIMEOUT.key();
        Object defaultValue2 = SourceProperties.POLL_TIMEOUT.defaultValue();
        config.getClass();
        PropertiesUtil.setOption(config, key6, defaultValue2, config::getInt, num -> {
            this.pollTimeout = num.intValue();
        });
        String key7 = SourceProperties.POLL_INTERVAL.key();
        Object defaultValue3 = SourceProperties.POLL_INTERVAL.defaultValue();
        config.getClass();
        PropertiesUtil.setOption(config, key7, defaultValue3, config::getLong, l2 -> {
            this.pollInterval = l2.longValue();
        });
        String key8 = SourceProperties.POLL_BATCH_SIZE.key();
        Object defaultValue4 = SourceProperties.POLL_BATCH_SIZE.defaultValue();
        config.getClass();
        PropertiesUtil.setOption(config, key8, defaultValue4, config::getInt, num2 -> {
            this.batchSize = num2.intValue();
        });
        setStartCursor(config);
        setStopCursor(config);
        setPartitionDiscoverer(config);
        setDeserialization(config);
        if ((this.partitionDiscoverer instanceof TopicPatternDiscoverer) && this.partitionDiscoveryIntervalMs > 0 && Boundedness.BOUNDED == this.stopCursor.getBoundedness()) {
            throw new PulsarConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "Bounded streams do not support dynamic partition discovery.");
        }
    }

    private void setStartCursor(Config config) {
        SourceProperties.StartMode startMode = (SourceProperties.StartMode) PropertiesUtil.getEnum(config, SourceProperties.CURSOR_STARTUP_MODE.key(), SourceProperties.StartMode.class, (Enum) SourceProperties.CURSOR_STARTUP_MODE.defaultValue());
        switch (startMode) {
            case EARLIEST:
                this.startCursor = StartCursor.earliest();
                return;
            case LATEST:
                this.startCursor = StartCursor.latest();
                return;
            case SUBSCRIPTION:
                this.startCursor = StartCursor.subscription((SubscriptionStartCursor.CursorResetStrategy) PropertiesUtil.getEnum(config, SourceProperties.CURSOR_RESET_MODE.key(), SubscriptionStartCursor.CursorResetStrategy.class, SubscriptionStartCursor.CursorResetStrategy.LATEST));
                return;
            case TIMESTAMP:
                if (StringUtils.isBlank(config.getString(SourceProperties.CURSOR_STARTUP_TIMESTAMP.key()))) {
                    throw new PulsarConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", SourceProperties.CURSOR_STARTUP_TIMESTAMP.key(), SourceProperties.CURSOR_STARTUP_MODE.key()));
                }
                String key = SourceProperties.CURSOR_STARTUP_TIMESTAMP.key();
                config.getClass();
                PropertiesUtil.setOption(config, key, config::getLong, l -> {
                    this.startCursor = StartCursor.timestamp(l.longValue());
                });
                return;
            default:
                throw new PulsarConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The %s mode is not supported.", startMode));
        }
    }

    private void setStopCursor(Config config) {
        SourceProperties.StopMode stopMode = (SourceProperties.StopMode) PropertiesUtil.getEnum(config, SourceProperties.CURSOR_STOP_MODE.key(), SourceProperties.StopMode.class, (Enum) SourceProperties.CURSOR_STOP_MODE.defaultValue());
        switch (stopMode) {
            case LATEST:
                this.stopCursor = StopCursor.latest();
                return;
            case NEVER:
                this.stopCursor = StopCursor.never();
                return;
            case TIMESTAMP:
                if (StringUtils.isBlank(config.getString(SourceProperties.CURSOR_STOP_TIMESTAMP.key()))) {
                    throw new PulsarConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", SourceProperties.CURSOR_STOP_TIMESTAMP.key(), SourceProperties.CURSOR_STOP_MODE.key()));
                }
                String key = SourceProperties.CURSOR_STARTUP_TIMESTAMP.key();
                config.getClass();
                PropertiesUtil.setOption(config, key, config::getLong, l -> {
                    this.stopCursor = StopCursor.timestamp(l.longValue());
                });
                return;
            default:
                throw new PulsarConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("The %s mode is not supported.", stopMode));
        }
    }

    private void setPartitionDiscoverer(Config config) {
        String string = config.getString(SourceProperties.TOPIC.key());
        if (StringUtils.isNotBlank(string)) {
            this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(string, ",")));
        }
        String string2 = config.getString(SourceProperties.TOPIC_PATTERN.key());
        if (StringUtils.isNotBlank(string2)) {
            if (this.partitionDiscoverer != null) {
                throw new PulsarConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' and '%s' is exclusive.", SourceProperties.TOPIC.key(), SourceProperties.TOPIC_PATTERN.key()));
            }
            this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(string2));
        }
        if (this.partitionDiscoverer == null) {
            throw new PulsarConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' or '%s' is required.", SourceProperties.TOPIC.key(), SourceProperties.TOPIC_PATTERN.key()));
        }
    }

    private void setDeserialization(Config config) {
        config.getString("format");
        this.deserialization = new JsonDeserializationSchema(false, false, SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType());
    }

    public Boundedness getBoundedness() {
        return this.stopCursor instanceof NeverStopCursor ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
    }

    public SeaTunnelDataType<T> getProducedType() {
        return this.deserialization.getProducedType();
    }

    public SourceReader<T, PulsarPartitionSplit> createReader(SourceReader.Context context) throws Exception {
        return new PulsarSourceReader(context, this.clientConfig, this.consumerConfig, this.startCursor, this.deserialization, this.pollTimeout, this.pollInterval, this.batchSize);
    }

    public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> createEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> context) throws Exception {
        return new PulsarSplitEnumerator(context, this.adminConfig, this.partitionDiscoverer, this.partitionDiscoveryIntervalMs, this.startCursor, this.stopCursor, this.consumerConfig.getSubscriptionName());
    }

    public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> restoreEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> context, PulsarSplitEnumeratorState pulsarSplitEnumeratorState) throws Exception {
        return new PulsarSplitEnumerator(context, this.adminConfig, this.partitionDiscoverer, this.partitionDiscoveryIntervalMs, this.startCursor, this.stopCursor, this.consumerConfig.getSubscriptionName(), pulsarSplitEnumeratorState.assignedPartitions());
    }

    public /* bridge */ /* synthetic */ SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context context, Serializable serializable) throws Exception {
        return restoreEnumerator((SourceSplitEnumerator.Context<PulsarPartitionSplit>) context, (PulsarSplitEnumeratorState) serializable);
    }
}
