/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source;

import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.pulsar.source.HashSplitSchedulingStrategy;
import org.apache.flink.connector.pulsar.source.MessageDeserializer;
import org.apache.flink.connector.pulsar.source.NoSplitDivisionStrategy;
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.SplitDivisionStrategy;
import org.apache.flink.connector.pulsar.source.SplitSchedulingStrategy;
import org.apache.flink.connector.pulsar.source.StartOffsetInitializer;
import org.apache.flink.connector.pulsar.source.StopCondition;
import org.apache.flink.connector.pulsar.source.util.PulsarAdminUtils;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class PulsarSourceBuilder<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class);
    private PulsarSubscriber subscriber;
    private StartOffsetInitializer startOffsetInitializer = StartOffsetInitializer.earliest();
    private StopCondition stopCondition = StopCondition.never();
    private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
    private MessageDeserializer<OUT> messageDeserializer;
    private SplitSchedulingStrategy splitSchedulingStrategy;
    private Configuration configuration = new Configuration();
    private ClientConfigurationData clientConfigurationData = new ClientConfigurationData();
    private ConsumerConfigurationData<byte[]> consumerConfigurationData = new ConsumerConfigurationData();

    PulsarSourceBuilder() {
        this.consumerConfigurationData.setSubscriptionMode(SubscriptionMode.NonDurable);
        this.consumerConfigurationData.setSubscriptionType(SubscriptionType.Exclusive);
        this.consumerConfigurationData.setSubscriptionName("flink-" + UUID.randomUUID());
    }

    public PulsarSourceBuilder<OUT> setTopics(SplitDivisionStrategy splitDivisionStrategy, String ... topics) {
        TreeSet topicNames = Sets.newTreeSet();
        List collect = Arrays.stream(topics).collect(Collectors.toList());
        for (String topic : collect) {
            topicNames.add(topic);
        }
        this.consumerConfigurationData.setTopicNames((Set)topicNames);
        return this.setSubscriber(PulsarSubscriber.getTopicListSubscriber(splitDivisionStrategy, topics));
    }

    public PulsarSourceBuilder<OUT> setTopicPattern(String namespace, SplitDivisionStrategy splitDivisionStrategy, Set<String> topicPatterns) {
        return this.setSubscriber(PulsarSubscriber.getTopicPatternSubscriber(namespace, splitDivisionStrategy, topicPatterns));
    }

    public PulsarSourceBuilder<OUT> setSubscriber(PulsarSubscriber subscriber) {
        Preconditions.checkState((subscriber != null ? 1 : 0) != 0, (Object)"topics or topic pattern subscriber already set");
        this.subscriber = subscriber;
        return this;
    }

    public PulsarSourceBuilder<OUT> setTopics(String ... topics) {
        return this.setTopics(NoSplitDivisionStrategy.INSTANCE, topics);
    }

    public PulsarSourceBuilder<OUT> setTopicPattern(String namespace, Set<String> topicPatterns) {
        return this.setTopicPattern(namespace, NoSplitDivisionStrategy.INSTANCE, topicPatterns);
    }

    public PulsarSourceBuilder<OUT> setSplitSchedulingStrategy(SplitSchedulingStrategy splitSchedulingStrategy) {
        this.splitSchedulingStrategy = splitSchedulingStrategy;
        return this;
    }

    public PulsarSourceBuilder<OUT> startAt(StartOffsetInitializer startOffsetInitializer) {
        this.startOffsetInitializer = startOffsetInitializer;
        return this;
    }

    public PulsarSourceBuilder<OUT> stopAt(StopCondition stopCondition) {
        this.boundedness = Boundedness.BOUNDED;
        this.stopCondition = stopCondition;
        return this;
    }

    public <T> PulsarSourceBuilder<T> setDeserializer(MessageDeserializer<T> messageDeserializer) {
        this.messageDeserializer = messageDeserializer;
        return this;
    }

    public PulsarSourceBuilder<OUT> configure(Consumer<Configuration> configurationConsumer) {
        configurationConsumer.accept(this.configuration);
        return this;
    }

    public PulsarSourceBuilder<OUT> configurePulsarClient(Consumer<ClientConfigurationData> configurationConsumer) {
        configurationConsumer.accept(this.clientConfigurationData);
        return this;
    }

    public PulsarSourceBuilder<OUT> configurePulsarConsumer(Consumer<ConsumerConfigurationData> configurationConsumer) {
        configurationConsumer.accept(this.consumerConfigurationData);
        return this;
    }

    public PulsarSource<OUT> build() {
        this.sanityCheck();
        if (this.splitSchedulingStrategy == null) {
            this.splitSchedulingStrategy = HashSplitSchedulingStrategy.INSTANCE;
        }
        return new PulsarSource<OUT>(this.subscriber, this.startOffsetInitializer, this.stopCondition, this.boundedness, this.messageDeserializer, this.configuration, this.clientConfigurationData, this.consumerConfigurationData, this.splitSchedulingStrategy);
    }

    private <T> boolean maybeOverride(ConfigOption<T> option, T value, boolean override) {
        boolean overridden = false;
        Object userValue = this.configuration.get(option);
        if (userValue != null) {
            if (override) {
                LOG.warn(String.format("Configuration %s is provided but will be overridden from %s to %s", option, userValue, value));
                this.configuration.set(option, value);
                overridden = true;
            }
        } else {
            this.configuration.set(option, value);
        }
        return overridden;
    }

    private void sanityCheck() {
        Preconditions.checkNotNull((Object)this.subscriber, (String)"No subscribe mode is specified, should be one of topics or topic pattern.");
        Preconditions.checkNotNull(this.messageDeserializer, (String)"Message deserializer is required but not provided.");
        if (this.maybeOverride(PulsarSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS, -1L, this.boundedness == Boundedness.BOUNDED)) {
            LOG.warn("{} property is overridden to -1 because the source is bounded.", PulsarSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
        }
        String adminUrl = this.configuration.getString(PulsarSourceOptions.ADMIN_URL);
        Preconditions.checkNotNull((Object)adminUrl, (String)(PulsarSourceOptions.ADMIN_URL.key() + " not set."));
        try {
            new ClientBuilderImpl(this.clientConfigurationData).build();
        }
        catch (PulsarClientException e) {
            throw new IllegalStateException("Cannot initialize pulsar client", e);
        }
        try {
            PulsarAdminUtils.newAdminFromConf(adminUrl, this.clientConfigurationData);
        }
        catch (PulsarClientException e) {
            throw new IllegalStateException("Cannot initialize pulsar admin", e);
        }
    }
}

