/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSource<T>
extends PushSource<T>
implements MessageListener<T> {
    private static final Logger log = LoggerFactory.getLogger(PulsarSource.class);
    private final PulsarClient pulsarClient;
    private final PulsarSourceConfig pulsarSourceConfig;
    private final Map<String, String> properties;
    private List<String> inputTopics;
    private List<Consumer<T>> inputConsumers;
    private final TopicSchema topicSchema;

    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
        this.pulsarClient = pulsarClient;
        this.pulsarSourceConfig = pulsarConfig;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.properties = properties;
    }

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        log.info("Opening pulsar source with config: {}", (Object)this.pulsarSourceConfig);
        Map<String, ConsumerConfig<T>> configs = this.setupConsumerConfigs();
        this.inputConsumers = configs.entrySet().stream().map(e -> {
            String topic = (String)e.getKey();
            ConsumerConfig conf = (ConsumerConfig)e.getValue();
            log.info("Creating consumers for topic : {}, schema : {}", (Object)topic, conf.getSchema());
            ConsumerBuilder cb = this.pulsarClient.newConsumer(conf.getSchema()).cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME).subscriptionName(this.pulsarSourceConfig.getSubscriptionName()).subscriptionType(this.pulsarSourceConfig.getSubscriptionType()).messageListener((MessageListener)this);
            if (conf.isRegexPattern) {
                cb.topicsPattern(topic);
            } else {
                cb.topic(new String[]{topic});
            }
            if (conf.getReceiverQueueSize() != null) {
                cb.receiverQueueSize(conf.getReceiverQueueSize().intValue());
            }
            cb.properties(this.properties);
            if (this.pulsarSourceConfig.getTimeoutMs() != null) {
                cb.ackTimeout(this.pulsarSourceConfig.getTimeoutMs().longValue(), TimeUnit.MILLISECONDS);
            }
            if (this.pulsarSourceConfig.getMaxMessageRetries() != null && this.pulsarSourceConfig.getMaxMessageRetries() >= 0) {
                DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder();
                deadLetterPolicyBuilder.maxRedeliverCount(this.pulsarSourceConfig.getMaxMessageRetries().intValue());
                if (this.pulsarSourceConfig.getDeadLetterTopic() != null && !this.pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
                    deadLetterPolicyBuilder.deadLetterTopic(this.pulsarSourceConfig.getDeadLetterTopic());
                }
                cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
            }
            return cb.subscribeAsync();
        }).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
        this.inputTopics = this.inputConsumers.stream().flatMap(c -> c instanceof MultiTopicsConsumerImpl ? ((MultiTopicsConsumerImpl)c).getTopics().stream() : Collections.singletonList(c.getTopic()).stream()).collect(Collectors.toList());
    }

    public void received(Consumer<T> consumer, Message<T> message) {
        PulsarRecord record = PulsarRecord.builder().message(message).topicName(message.getTopicName()).ackFunction(() -> {
            if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                consumer.acknowledgeCumulativeAsync(message);
            } else {
                consumer.acknowledgeAsync(message);
            }
        }).failFunction(() -> {
            if (this.pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                throw new RuntimeException("Failed to process message: " + message.getMessageId());
            }
            consumer.negativeAcknowledge(message);
        }).build();
        this.consume(record);
    }

    public void close() throws Exception {
        if (this.inputConsumers != null) {
            this.inputConsumers.forEach(consumer -> {
                try {
                    consumer.close();
                }
                catch (PulsarClientException pulsarClientException) {
                    // empty catch block
                }
            });
        }
    }

    @VisibleForTesting
    Map<String, ConsumerConfig<T>> setupConsumerConfigs() throws ClassNotFoundException {
        TreeMap<String, ConsumerConfig<T>> configs = new TreeMap<String, ConsumerConfig<T>>();
        Class typeArg = Reflections.loadClass((String)this.pulsarSourceConfig.getTypeClassName(), (ClassLoader)Thread.currentThread().getContextClassLoader());
        Preconditions.checkArgument((!Void.class.equals((Object)typeArg) ? 1 : 0) != 0, (Object)"Input type of Pulsar Function cannot be Void");
        this.pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> {
            Schema<?> schema = conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty() ? this.topicSchema.getSchema((String)topic, typeArg, conf.getSerdeClassName(), true) : this.topicSchema.getSchema((String)topic, typeArg, conf.getSchemaType(), true);
            configs.put((String)topic, ConsumerConfig.builder().schema(schema).isRegexPattern(conf.isRegexPattern()).receiverQueueSize(conf.getReceiverQueueSize()).build());
        });
        return configs;
    }

    public List<String> getInputTopics() {
        return this.inputTopics;
    }

    private static class ConsumerConfig<T> {
        private Schema<T> schema;
        private boolean isRegexPattern;
        private Integer receiverQueueSize;

        ConsumerConfig(Schema<T> schema, boolean isRegexPattern, Integer receiverQueueSize) {
            this.schema = schema;
            this.isRegexPattern = isRegexPattern;
            this.receiverQueueSize = receiverQueueSize;
        }

        public static <T> ConsumerConfigBuilder<T> builder() {
            return new ConsumerConfigBuilder();
        }

        public Schema<T> getSchema() {
            return this.schema;
        }

        public boolean isRegexPattern() {
            return this.isRegexPattern;
        }

        public Integer getReceiverQueueSize() {
            return this.receiverQueueSize;
        }

        public void setSchema(Schema<T> schema) {
            this.schema = schema;
        }

        public void setRegexPattern(boolean isRegexPattern) {
            this.isRegexPattern = isRegexPattern;
        }

        public void setReceiverQueueSize(Integer receiverQueueSize) {
            this.receiverQueueSize = receiverQueueSize;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof ConsumerConfig)) {
                return false;
            }
            ConsumerConfig other = (ConsumerConfig)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Schema<T> this$schema = this.getSchema();
            Schema<T> other$schema = other.getSchema();
            if (this$schema == null ? other$schema != null : !this$schema.equals(other$schema)) {
                return false;
            }
            if (this.isRegexPattern() != other.isRegexPattern()) {
                return false;
            }
            Integer this$receiverQueueSize = this.getReceiverQueueSize();
            Integer other$receiverQueueSize = other.getReceiverQueueSize();
            return !(this$receiverQueueSize == null ? other$receiverQueueSize != null : !((Object)this$receiverQueueSize).equals(other$receiverQueueSize));
        }

        protected boolean canEqual(Object other) {
            return other instanceof ConsumerConfig;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Schema<T> $schema = this.getSchema();
            result = result * 59 + ($schema == null ? 43 : $schema.hashCode());
            result = result * 59 + (this.isRegexPattern() ? 79 : 97);
            Integer $receiverQueueSize = this.getReceiverQueueSize();
            result = result * 59 + ($receiverQueueSize == null ? 43 : ((Object)$receiverQueueSize).hashCode());
            return result;
        }

        public String toString() {
            return "PulsarSource.ConsumerConfig(schema=" + this.getSchema() + ", isRegexPattern=" + this.isRegexPattern() + ", receiverQueueSize=" + this.getReceiverQueueSize() + ")";
        }

        public static class ConsumerConfigBuilder<T> {
            private Schema<T> schema;
            private boolean isRegexPattern;
            private Integer receiverQueueSize;

            ConsumerConfigBuilder() {
            }

            public ConsumerConfigBuilder<T> schema(Schema<T> schema) {
                this.schema = schema;
                return this;
            }

            public ConsumerConfigBuilder<T> isRegexPattern(boolean isRegexPattern) {
                this.isRegexPattern = isRegexPattern;
                return this;
            }

            public ConsumerConfigBuilder<T> receiverQueueSize(Integer receiverQueueSize) {
                this.receiverQueueSize = receiverQueueSize;
                return this;
            }

            public ConsumerConfig<T> build() {
                return new ConsumerConfig<T>(this.schema, this.isRegexPattern, this.receiverQueueSize);
            }

            public String toString() {
                return "PulsarSource.ConsumerConfig.ConsumerConfigBuilder(schema=" + this.schema + ", isRegexPattern=" + this.isRegexPattern + ", receiverQueueSize=" + this.receiverQueueSize + ")";
            }
        }
    }
}

