package org.apache.pulsar.spark;

import java.lang.invoke.SerializedLambda;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/spark/SparkStreamingPulsarReceiver.class */
public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
    private String serviceUrl;
    private ConsumerConfigurationData<byte[]> conf;
    private Authentication authentication;
    private PulsarClient pulsarClient;
    private Consumer<byte[]> consumer;

    public SparkStreamingPulsarReceiver(String str, ConsumerConfigurationData<byte[]> consumerConfigurationData, Authentication authentication) {
        this(StorageLevel.MEMORY_AND_DISK_2(), str, consumerConfigurationData, authentication);
    }

    public SparkStreamingPulsarReceiver(StorageLevel storageLevel, String str, ConsumerConfigurationData<byte[]> consumerConfigurationData, Authentication authentication) {
        super(storageLevel);
        Preconditions.checkNotNull(str, "serviceUrl must not be null");
        Preconditions.checkNotNull(consumerConfigurationData, "ConsumerConfigurationData must not be null");
        Preconditions.checkArgument(consumerConfigurationData.getTopicNames().size() > 0, "TopicNames must be set a value.");
        Preconditions.checkNotNull(consumerConfigurationData.getSubscriptionName(), "SubscriptionName must not be null");
        this.serviceUrl = str;
        this.authentication = authentication;
        if (consumerConfigurationData.getMessageListener() == null) {
            consumerConfigurationData.setMessageListener((consumer, message) -> {
                try {
                    store(message.getData());
                    consumer.acknowledgeAsync(message);
                } catch (Exception e) {
                    LOG.error("Failed to store a message : {}", e.getMessage());
                    consumer.negativeAcknowledge(message);
                }
            });
        }
        this.conf = consumerConfigurationData;
    }

    public void onStart() {
        try {
            this.pulsarClient = PulsarClient.builder().serviceUrl(this.serviceUrl).authentication(this.authentication).build();
            this.consumer = (Consumer) this.pulsarClient.subscribeAsync(this.conf).join();
        } catch (Exception e) {
            LOG.error("Failed to start subscription : {}", e.getMessage());
            restart("Restart a consumer");
        }
    }

    public void onStop() {
        try {
            if (this.consumer != null) {
                this.consumer.close();
            }
            if (this.pulsarClient != null) {
                this.pulsarClient.close();
            }
        } catch (PulsarClientException e) {
            LOG.error("Failed to close client : {}", e.getMessage());
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 301138578:
                if (implMethodName.equals("lambda$new$a55478d4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/spark/SparkStreamingPulsarReceiver") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    SparkStreamingPulsarReceiver sparkStreamingPulsarReceiver = (SparkStreamingPulsarReceiver) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        try {
                            store(message.getData());
                            consumer.acknowledgeAsync(message);
                        } catch (Exception e) {
                            LOG.error("Failed to store a message : {}", e.getMessage());
                            consumer.negativeAcknowledge(message);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
