package org.apache.pulsar.spark;

import java.lang.invoke.SerializedLambda;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/spark/SparkStreamingPulsarReceiver.class */
public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
    private ClientConfiguration clientConfiguration;
    private ConsumerConfiguration consumerConfiguration;
    private PulsarClient pulsarClient;
    private String url;
    private String topic;
    private String subscription;
    private static final Logger log = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);

    public SparkStreamingPulsarReceiver(ClientConfiguration clientConfiguration, ConsumerConfiguration consumerConfiguration, String str, String str2, String str3) {
        this(StorageLevel.MEMORY_AND_DISK_2(), clientConfiguration, consumerConfiguration, str, str2, str3);
    }

    public SparkStreamingPulsarReceiver(StorageLevel storageLevel, ClientConfiguration clientConfiguration, ConsumerConfiguration consumerConfiguration, String str, String str2, String str3) {
        super(storageLevel);
        this.clientConfiguration = clientConfiguration;
        this.url = str;
        this.topic = str2;
        this.subscription = str3;
        if (consumerConfiguration.getAckTimeoutMillis() == 0) {
            consumerConfiguration.setAckTimeout(60L, TimeUnit.SECONDS);
        }
        consumerConfiguration.setMessageListener((consumer, message) -> {
            try {
                store(message.getData());
                consumer.acknowledgeAsync(message);
            } catch (Exception e) {
                log.error("Failed to store a message : {}", e.getMessage());
            }
        });
        this.consumerConfiguration = consumerConfiguration;
    }

    public void onStart() {
        try {
            this.pulsarClient = PulsarClient.create(this.url, this.clientConfiguration);
            this.pulsarClient.subscribe(this.topic, this.subscription, this.consumerConfiguration);
        } catch (PulsarClientException e) {
            log.error("Failed to start subscription : {}", e.getMessage());
            restart("Restart a consumer");
        }
    }

    public void onStop() {
        try {
            if (this.pulsarClient != null) {
                this.pulsarClient.close();
            }
        } catch (PulsarClientException e) {
            log.error("Failed to close client : {}", e.getMessage());
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1455157337:
                if (implMethodName.equals("lambda$new$a5a7139$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/spark/SparkStreamingPulsarReceiver") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    SparkStreamingPulsarReceiver sparkStreamingPulsarReceiver = (SparkStreamingPulsarReceiver) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        try {
                            store(message.getData());
                            consumer.acknowledgeAsync(message);
                        } catch (Exception e) {
                            log.error("Failed to store a message : {}", e.getMessage());
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
