package org.apache.pulsar.functions.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.4.0.jar:org/apache/pulsar/functions/source/PulsarSource.class */
public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarSource.class);
    private final PulsarClient pulsarClient;
    private final PulsarSourceConfig pulsarSourceConfig;
    private final Map<String, String> properties;
    private List<String> inputTopics;
    private List<Consumer<T>> inputConsumers;
    private final TopicSchema topicSchema;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.4.0.jar:org/apache/pulsar/functions/source/PulsarSource$ConsumerConfig.class */
    public static class ConsumerConfig<T> {
        private Schema<T> schema;
        private boolean isRegexPattern;
        private Integer receiverQueueSize;

        /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.4.0.jar:org/apache/pulsar/functions/source/PulsarSource$ConsumerConfig$ConsumerConfigBuilder.class */
        public static class ConsumerConfigBuilder<T> {
            private Schema<T> schema;
            private boolean isRegexPattern;
            private Integer receiverQueueSize;

            ConsumerConfigBuilder() {
            }

            public ConsumerConfigBuilder<T> schema(Schema<T> schema) {
                this.schema = schema;
                return this;
            }

            public ConsumerConfigBuilder<T> isRegexPattern(boolean z) {
                this.isRegexPattern = z;
                return this;
            }

            public ConsumerConfigBuilder<T> receiverQueueSize(Integer num) {
                this.receiverQueueSize = num;
                return this;
            }

            public ConsumerConfig<T> build() {
                return new ConsumerConfig<>(this.schema, this.isRegexPattern, this.receiverQueueSize);
            }

            public String toString() {
                return "PulsarSource.ConsumerConfig.ConsumerConfigBuilder(schema=" + this.schema + ", isRegexPattern=" + this.isRegexPattern + ", receiverQueueSize=" + this.receiverQueueSize + DefaultExpressionEngine.DEFAULT_INDEX_END;
            }
        }

        ConsumerConfig(Schema<T> schema, boolean z, Integer num) {
            this.schema = schema;
            this.isRegexPattern = z;
            this.receiverQueueSize = num;
        }

        public static <T> ConsumerConfigBuilder<T> builder() {
            return new ConsumerConfigBuilder<>();
        }

        public Schema<T> getSchema() {
            return this.schema;
        }

        public boolean isRegexPattern() {
            return this.isRegexPattern;
        }

        public Integer getReceiverQueueSize() {
            return this.receiverQueueSize;
        }

        public void setSchema(Schema<T> schema) {
            this.schema = schema;
        }

        public void setRegexPattern(boolean z) {
            this.isRegexPattern = z;
        }

        public void setReceiverQueueSize(Integer num) {
            this.receiverQueueSize = num;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ConsumerConfig)) {
                return false;
            }
            ConsumerConfig consumerConfig = (ConsumerConfig) obj;
            if (!consumerConfig.canEqual(this)) {
                return false;
            }
            Schema<T> schema = getSchema();
            Schema<T> schema2 = consumerConfig.getSchema();
            if (schema == null) {
                if (schema2 != null) {
                    return false;
                }
            } else if (!schema.equals(schema2)) {
                return false;
            }
            if (isRegexPattern() != consumerConfig.isRegexPattern()) {
                return false;
            }
            Integer receiverQueueSize = getReceiverQueueSize();
            Integer receiverQueueSize2 = consumerConfig.getReceiverQueueSize();
            return receiverQueueSize == null ? receiverQueueSize2 == null : receiverQueueSize.equals(receiverQueueSize2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ConsumerConfig;
        }

        public int hashCode() {
            Schema<T> schema = getSchema();
            int hashCode = (((1 * 59) + (schema == null ? 43 : schema.hashCode())) * 59) + (isRegexPattern() ? 79 : 97);
            Integer receiverQueueSize = getReceiverQueueSize();
            return (hashCode * 59) + (receiverQueueSize == null ? 43 : receiverQueueSize.hashCode());
        }

        public String toString() {
            return "PulsarSource.ConsumerConfig(schema=" + getSchema() + ", isRegexPattern=" + isRegexPattern() + ", receiverQueueSize=" + getReceiverQueueSize() + DefaultExpressionEngine.DEFAULT_INDEX_END;
        }
    }

    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarSourceConfig, Map<String, String> map) {
        this.pulsarClient = pulsarClient;
        this.pulsarSourceConfig = pulsarSourceConfig;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.properties = map;
    }

    @Override // org.apache.pulsar.io.core.PushSource, org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        log.info("Opening pulsar source with config: {}", this.pulsarSourceConfig);
        this.inputConsumers = (List) ((List) setupConsumerConfigs().entrySet().stream().map(entry -> {
            String str = (String) entry.getKey();
            ConsumerConfig consumerConfig = (ConsumerConfig) entry.getValue();
            log.info("Creating consumers for topic : {}, schema : {}", str, consumerConfig.getSchema());
            ConsumerBuilder<T> messageListener = this.pulsarClient.newConsumer(consumerConfig.getSchema()).cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).subscriptionName(this.pulsarSourceConfig.getSubscriptionName()).subscriptionType(this.pulsarSourceConfig.getSubscriptionType()).messageListener(this);
            if (consumerConfig.isRegexPattern) {
                messageListener.topicsPattern(str);
            } else {
                messageListener.topic(str);
            }
            if (consumerConfig.getReceiverQueueSize() != null) {
                messageListener.receiverQueueSize(consumerConfig.getReceiverQueueSize().intValue());
            }
            messageListener.properties(this.properties);
            if (this.pulsarSourceConfig.getTimeoutMs() != null) {
                messageListener.ackTimeout(this.pulsarSourceConfig.getTimeoutMs().longValue(), TimeUnit.MILLISECONDS);
            }
            if (this.pulsarSourceConfig.getMaxMessageRetries() != null && this.pulsarSourceConfig.getMaxMessageRetries().intValue() >= 0) {
                DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
                builder.maxRedeliverCount(this.pulsarSourceConfig.getMaxMessageRetries().intValue());
                if (this.pulsarSourceConfig.getDeadLetterTopic() != null && !this.pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
                    builder.deadLetterTopic(this.pulsarSourceConfig.getDeadLetterTopic());
                }
                messageListener.deadLetterPolicy(builder.build());
            }
            return messageListener.subscribeAsync();
        }).collect(Collectors.toList())).stream().map((v0) -> {
            return v0.join();
        }).collect(Collectors.toList());
        this.inputTopics = (List) this.inputConsumers.stream().flatMap(consumer -> {
            return consumer instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl) consumer).getTopics().stream() : Collections.singletonList(consumer.getTopic()).stream();
        }).collect(Collectors.toList());
    }

    @Override // org.apache.pulsar.client.api.MessageListener
    public void received(Consumer<T> consumer, Message<T> message) {
        consume(PulsarRecord.builder().message(message).topicName(message.getTopicName()).ackFunction(() -> {
            if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                consumer.acknowledgeCumulativeAsync((Message<?>) message);
            } else {
                consumer.acknowledgeAsync((Message<?>) message);
            }
        }).failFunction(() -> {
            if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                throw new RuntimeException("Failed to process message: " + message.getMessageId());
            }
            consumer.negativeAcknowledge((Message<?>) message);
        }).build());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.inputConsumers != null) {
            this.inputConsumers.forEach(consumer -> {
                try {
                    consumer.close();
                } catch (PulsarClientException e) {
                }
            });
        }
    }

    @VisibleForTesting
    Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundException {
        TreeMap treeMap = new TreeMap();
        Class loadClass = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), Thread.currentThread().getContextClassLoader());
        Preconditions.checkArgument(!Void.class.equals(loadClass), "Input type of Pulsar Function cannot be Void");
        this.pulsarSourceConfig.getTopicSchema().forEach((str, consumerConfig) -> {
            treeMap.put(str, ConsumerConfig.builder().schema((consumerConfig.getSerdeClassName() == null || consumerConfig.getSerdeClassName().isEmpty()) ? this.topicSchema.getSchema(str, (Class<?>) loadClass, consumerConfig.getSchemaType(), true) : this.topicSchema.getSchema(str, (Class<?>) loadClass, consumerConfig.getSerdeClassName(), true)).isRegexPattern(consumerConfig.isRegexPattern()).receiverQueueSize(consumerConfig.getReceiverQueueSize()).build());
        });
        return treeMap;
    }

    public List<String> getInputTopics() {
        return this.inputTopics;
    }
}
