package org.apache.flink.connector.pulsar.source.config;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.connector.pulsar.common.config.PulsarConfigValidator;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.class */
public final class PulsarSourceConfigUtils {
    private static final BatchReceivePolicy DISABLED_BATCH_RECEIVE_POLICY = BatchReceivePolicy.builder().timeout(0, TimeUnit.MILLISECONDS).maxNumMessages(1).build();
    public static final PulsarConfigValidator SOURCE_CONFIG_VALIDATOR = PulsarConfigValidator.builder().requiredOption(PulsarOptions.PULSAR_SERVICE_URL).requiredOption(PulsarOptions.PULSAR_ADMIN_URL).requiredOption(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME).conflictOptions(PulsarOptions.PULSAR_AUTH_PARAMS, PulsarOptions.PULSAR_AUTH_PARAM_MAP).build();

    private PulsarSourceConfigUtils() {
    }

    public static <T> ConsumerBuilder<T> createConsumerBuilder(PulsarClient pulsarClient, Schema<T> schema, SourceConfiguration sourceConfiguration) {
        PulsarConsumerBuilder pulsarConsumerBuilder = new PulsarConsumerBuilder(pulsarClient, schema);
        ConfigOption<String> configOption = PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption, pulsarConsumerBuilder::subscriptionName);
        sourceConfiguration.useOption(PulsarSourceOptions.PULSAR_ACK_TIMEOUT_MILLIS, l -> {
            pulsarConsumerBuilder.ackTimeout(l.longValue(), TimeUnit.MILLISECONDS);
        });
        ConfigOption<Boolean> configOption2 = PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption2, (v1) -> {
            r2.isAckReceiptEnabled(v1);
        });
        sourceConfiguration.useOption(PulsarSourceOptions.PULSAR_TICK_DURATION_MILLIS, l2 -> {
            pulsarConsumerBuilder.ackTimeoutTickTime(l2.longValue(), TimeUnit.MILLISECONDS);
        });
        sourceConfiguration.useOption(PulsarSourceOptions.PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS, l3 -> {
            pulsarConsumerBuilder.negativeAckRedeliveryDelay(l3.longValue(), TimeUnit.MICROSECONDS);
        });
        ConfigOption<SubscriptionType> configOption3 = PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption3, pulsarConsumerBuilder::subscriptionType);
        ConfigOption<SubscriptionMode> configOption4 = PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption4, pulsarConsumerBuilder::subscriptionMode);
        ConfigOption<ConsumerCryptoFailureAction> configOption5 = PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption5, pulsarConsumerBuilder::cryptoFailureAction);
        ConfigOption<Integer> configOption6 = PulsarSourceOptions.PULSAR_RECEIVER_QUEUE_SIZE;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption6, (v1) -> {
            r2.receiverQueueSize(v1);
        });
        sourceConfiguration.useOption(PulsarSourceOptions.PULSAR_ACKNOWLEDGEMENTS_GROUP_TIME_MICROS, l4 -> {
            pulsarConsumerBuilder.acknowledgmentGroupTime(l4.longValue(), TimeUnit.MICROSECONDS);
        });
        ConfigOption<Boolean> configOption7 = PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption7, (v1) -> {
            r2.replicateSubscriptionState(v1);
        });
        ConfigOption<Integer> configOption8 = PulsarSourceOptions.PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption8, (v1) -> {
            r2.maxTotalReceiverQueueSizeAcrossPartitions(v1);
        });
        ConfigOption<String> configOption9 = PulsarSourceOptions.PULSAR_CONSUMER_NAME;
        Function function = str -> {
            return String.format(str, UUID.randomUUID());
        };
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption9, function, pulsarConsumerBuilder::consumerName);
        ConfigOption<Boolean> configOption10 = PulsarSourceOptions.PULSAR_READ_COMPACTED;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption10, (v1) -> {
            r2.readCompacted(v1);
        });
        ConfigOption<Integer> configOption11 = PulsarSourceOptions.PULSAR_PRIORITY_LEVEL;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption11, (v1) -> {
            r2.priorityLevel(v1);
        });
        Optional<DeadLetterPolicy> createDeadLetterPolicy = createDeadLetterPolicy(sourceConfiguration);
        Objects.requireNonNull(pulsarConsumerBuilder);
        createDeadLetterPolicy.ifPresent(pulsarConsumerBuilder::deadLetterPolicy);
        sourceConfiguration.useOption(PulsarSourceOptions.PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS, num -> {
            pulsarConsumerBuilder.autoUpdatePartitionsInterval(num.intValue(), TimeUnit.SECONDS);
        });
        ConfigOption<Boolean> configOption12 = PulsarSourceOptions.PULSAR_RETRY_ENABLE;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption12, (v1) -> {
            r2.enableRetry(v1);
        });
        ConfigOption<Integer> configOption13 = PulsarSourceOptions.PULSAR_MAX_PENDING_CHUNKED_MESSAGE;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption13, (v1) -> {
            r2.maxPendingChunkedMessage(v1);
        });
        ConfigOption<Boolean> configOption14 = PulsarSourceOptions.PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption14, (v1) -> {
            r2.autoAckOldestChunkedMessageOnQueueFull(v1);
        });
        sourceConfiguration.useOption(PulsarSourceOptions.PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS, l5 -> {
            pulsarConsumerBuilder.expireTimeOfIncompleteChunkedMessage(l5.longValue(), TimeUnit.MILLISECONDS);
        });
        ConfigOption<Boolean> configOption15 = PulsarSourceOptions.PULSAR_POOL_MESSAGES;
        Objects.requireNonNull(pulsarConsumerBuilder);
        sourceConfiguration.useOption(configOption15, (v1) -> {
            r2.poolMessages(v1);
        });
        Map<String, String> properties = sourceConfiguration.getProperties(PulsarSourceOptions.PULSAR_CONSUMER_PROPERTIES);
        if (!properties.isEmpty()) {
            pulsarConsumerBuilder.properties(properties);
        }
        pulsarConsumerBuilder.batchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);
        return pulsarConsumerBuilder;
    }

    private static Optional<DeadLetterPolicy> createDeadLetterPolicy(SourceConfiguration sourceConfiguration) {
        if (!sourceConfiguration.contains(PulsarSourceOptions.PULSAR_MAX_REDELIVER_COUNT) && !sourceConfiguration.contains(PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC) && !sourceConfiguration.contains(PulsarSourceOptions.PULSAR_DEAD_LETTER_TOPIC)) {
            return Optional.empty();
        }
        DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
        ConfigOption<Integer> configOption = PulsarSourceOptions.PULSAR_MAX_REDELIVER_COUNT;
        Objects.requireNonNull(builder);
        sourceConfiguration.useOption(configOption, (v1) -> {
            r2.maxRedeliverCount(v1);
        });
        ConfigOption<String> configOption2 = PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
        Objects.requireNonNull(builder);
        sourceConfiguration.useOption(configOption2, builder::retryLetterTopic);
        ConfigOption<String> configOption3 = PulsarSourceOptions.PULSAR_DEAD_LETTER_TOPIC;
        Objects.requireNonNull(builder);
        sourceConfiguration.useOption(configOption3, builder::deadLetterTopic);
        return Optional.of(builder.build());
    }
}
