package org.apache.flink.connector.pulsar.source;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.UniformRangeGenerator;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.class */
public final class PulsarSourceBuilder<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class);
    private PulsarSubscriber subscriber;
    private RangeGenerator rangeGenerator;
    private PulsarDeserializationSchema<OUT> deserializationSchema;
    private final PulsarConfigBuilder configBuilder = new PulsarConfigBuilder();
    private StartCursor startCursor = StartCursor.defaultStartCursor();
    private StopCursor stopCursor = StopCursor.defaultStopCursor();
    private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;

    public PulsarSourceBuilder<OUT> setAdminUrl(String str) {
        return setConfig(PulsarOptions.PULSAR_ADMIN_URL, str);
    }

    public PulsarSourceBuilder<OUT> setServiceUrl(String str) {
        return setConfig(PulsarOptions.PULSAR_SERVICE_URL, str);
    }

    public PulsarSourceBuilder<OUT> setSubscriptionName(String str) {
        return setConfig(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME, str);
    }

    public PulsarSourceBuilder<OUT> setSubscriptionType(SubscriptionType subscriptionType) {
        return setConfig(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
    }

    public PulsarSourceBuilder<OUT> setTopics(String... strArr) {
        return setTopics(Arrays.asList(strArr));
    }

    public PulsarSourceBuilder<OUT> setTopics(List<String> list) {
        ensureSubscriberIsNull("topics");
        this.subscriber = PulsarSubscriber.getTopicListSubscriber(TopicNameUtils.distinctTopics(list));
        return this;
    }

    public PulsarSourceBuilder<OUT> setTopicPattern(String str) {
        return setTopicPattern(Pattern.compile(str));
    }

    public PulsarSourceBuilder<OUT> setTopicPattern(Pattern pattern) {
        return setTopicPattern(pattern, RegexSubscriptionMode.AllTopics);
    }

    public PulsarSourceBuilder<OUT> setTopicPattern(String str, RegexSubscriptionMode regexSubscriptionMode) {
        return setTopicPattern(Pattern.compile(str), regexSubscriptionMode);
    }

    public PulsarSourceBuilder<OUT> setTopicPattern(Pattern pattern, RegexSubscriptionMode regexSubscriptionMode) {
        ensureSubscriberIsNull("topic pattern");
        this.subscriber = PulsarSubscriber.getTopicPatternSubscriber(pattern, regexSubscriptionMode);
        return this;
    }

    public PulsarSourceBuilder<OUT> setConsumerName(String str) {
        return setConfig(PulsarSourceOptions.PULSAR_CONSUMER_NAME, str);
    }

    public PulsarSourceBuilder<OUT> setRangeGenerator(RangeGenerator rangeGenerator) {
        if (this.configBuilder.contains(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE)) {
            SubscriptionType subscriptionType = (SubscriptionType) this.configBuilder.get(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE);
            Preconditions.checkArgument(subscriptionType == SubscriptionType.Key_Shared, "Key_Shared subscription should be used for custom rangeGenerator instead of %s", new Object[]{subscriptionType});
        } else {
            LOG.warn("No subscription type provided, set it to Key_Shared.");
            setSubscriptionType(SubscriptionType.Key_Shared);
        }
        this.rangeGenerator = (RangeGenerator) Preconditions.checkNotNull(rangeGenerator);
        return this;
    }

    public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor) {
        this.startCursor = (StartCursor) Preconditions.checkNotNull(startCursor);
        return this;
    }

    public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor stopCursor) {
        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
        this.stopCursor = (StopCursor) Preconditions.checkNotNull(stopCursor);
        return this;
    }

    public PulsarSourceBuilder<OUT> setBoundedStopCursor(StopCursor stopCursor) {
        this.boundedness = Boundedness.BOUNDED;
        this.stopCursor = (StopCursor) Preconditions.checkNotNull(stopCursor);
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(PulsarDeserializationSchema<T> pulsarDeserializationSchema) {
        PulsarSourceBuilder<T> specialized = specialized();
        specialized.deserializationSchema = pulsarDeserializationSchema;
        return specialized;
    }

    public <T> PulsarSourceBuilder<OUT> setConfig(ConfigOption<T> configOption, T t) {
        this.configBuilder.set(configOption, t);
        return this;
    }

    public PulsarSourceBuilder<OUT> setConfig(Configuration configuration) {
        this.configBuilder.set(configuration);
        return this;
    }

    public PulsarSourceBuilder<OUT> setProperties(Properties properties) {
        this.configBuilder.set(properties);
        return this;
    }

    public PulsarSource<OUT> build() {
        Preconditions.checkNotNull(this.subscriber, "No topic names or topic pattern are provided.");
        SubscriptionType subscriptionType = (SubscriptionType) this.configBuilder.get(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE);
        if (subscriptionType != SubscriptionType.Key_Shared) {
            this.rangeGenerator = new FullRangeGenerator();
        } else if (this.rangeGenerator == null) {
            LOG.warn("No range generator provided for key_shared subscription, we would use the UniformRangeGenerator as the default range generator.");
            this.rangeGenerator = new UniformRangeGenerator();
        }
        if (this.boundedness == null) {
            LOG.warn("No boundedness was set, mark it as a endless stream.");
            this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
        }
        if (this.boundedness == Boundedness.BOUNDED && ((Long) this.configBuilder.get(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS)).longValue() >= 0) {
            LOG.warn("{} property is overridden to -1 because the source is bounded.", PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
            this.configBuilder.override(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
        }
        Preconditions.checkNotNull(this.deserializationSchema, "deserializationSchema should be set.");
        if (Boolean.FALSE.equals(this.configBuilder.get(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE)) && (subscriptionType == SubscriptionType.Key_Shared || subscriptionType == SubscriptionType.Shared)) {
            LOG.info("Pulsar cursor auto commit is disabled, make sure checkpoint is enabled and your pulsar cluster is support the transaction.");
            this.configBuilder.override(PulsarOptions.PULSAR_ENABLE_TRANSACTION, true);
            if (this.configBuilder.contains(PulsarSourceOptions.PULSAR_READ_TRANSACTION_TIMEOUT)) {
                LOG.warn("The configured transaction timeout is {} mille seconds, make sure it was greater than your checkpoint interval.", (Long) this.configBuilder.get(PulsarSourceOptions.PULSAR_READ_TRANSACTION_TIMEOUT));
            } else {
                LOG.warn("The default pulsar transaction timeout is 3 hours, make sure it was greater than your checkpoint interval.");
            }
        }
        if (!this.configBuilder.contains(PulsarSourceOptions.PULSAR_CONSUMER_NAME)) {
            LOG.warn("We recommend set a readable consumer name through setConsumerName(String) in production mode.");
        }
        Preconditions.checkState(InstantiationUtil.isSerializable(this.startCursor), "StartCursor isn't serializable");
        Preconditions.checkState(InstantiationUtil.isSerializable(this.stopCursor), "StopCursor isn't serializable");
        Preconditions.checkState(InstantiationUtil.isSerializable(this.rangeGenerator), "RangeGenerator isn't serializable");
        return new PulsarSource<>((SourceConfiguration) this.configBuilder.build(PulsarSourceConfigUtils.SOURCE_CONFIG_VALIDATOR, SourceConfiguration::new), this.subscriber, this.rangeGenerator, this.startCursor, this.stopCursor, this.boundedness, this.deserializationSchema);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends OUT> PulsarSourceBuilder<T> specialized() {
        return this;
    }

    private void ensureSubscriberIsNull(String str) {
        if (this.subscriber != null) {
            throw new IllegalStateException(String.format("Cannot use %s for consumption because a %s is already set for consumption.", str, this.subscriber.getClass().getSimpleName()));
        }
    }
}
