package org.apache.eventmesh.connector.pulsar.consumer;

import com.google.common.base.Preconditions;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventDeserializationException;
import io.cloudevents.core.provider.EventFormatProvider;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.consumer.Consumer;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl.class */
public class PulsarConsumerImpl implements Consumer {
    private static final Logger log = LoggerFactory.getLogger(PulsarConsumerImpl.class);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private Properties properties;
    private PulsarClient pulsarClient;
    private org.apache.pulsar.client.api.Consumer<byte[]> consumer;
    private EventListener eventListener;

    public void init(Properties properties) throws Exception {
        this.properties = properties;
        ClientConfiguration clientConfiguration = ClientConfiguration.getInstance();
        try {
            ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(clientConfiguration.getServiceAddr());
            if (clientConfiguration.getAuthPlugin() != null) {
                Preconditions.checkNotNull(clientConfiguration.getAuthParams(), "Authentication Enabled in pulsar cluster, Please set authParams in pulsar-client.properties");
                serviceUrl.authentication(clientConfiguration.getAuthPlugin(), clientConfiguration.getAuthParams());
            }
            this.pulsarClient = serviceUrl.build();
        } catch (Exception e) {
            throw new ConnectorRuntimeException(String.format("Failed to connect pulsar with exception: %", e.getMessage()));
        }
    }

    public void start() {
        this.started.compareAndSet(false, true);
    }

    public void subscribe(final String str) throws Exception {
        if (this.pulsarClient == null) {
            throw new ConnectorRuntimeException(String.format("Cann't find the pulsar client for topic: %s", str));
        }
        EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = new EventMeshAsyncConsumeContext() { // from class: org.apache.eventmesh.connector.pulsar.consumer.PulsarConsumerImpl.1
            public void commit(EventMeshAction eventMeshAction) {
                PulsarConsumerImpl.log.debug("message action: {} for topic: {}", eventMeshAction.name(), str);
            }
        };
        this.consumer = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName(this.properties.getProperty("consumerGroup")).messageListener((consumer, message) -> {
            this.eventListener.consume(EventFormatProvider.getInstance().resolveFormat("application/cloudevents+json").deserialize(message.getData()), eventMeshAsyncConsumeContext);
            try {
                consumer.acknowledge(message);
            } catch (PulsarClientException e) {
                throw new ConnectorRuntimeException(String.format("Failed to unsubscribe the topic:%s with exception: %s", str, e.getMessage()));
            } catch (EventDeserializationException e2) {
                log.warn("The Message isn't json format, with exception:{}", e2.getMessage());
            }
        }).subscribe();
    }

    public void unsubscribe(String str) {
        try {
            this.consumer.unsubscribe();
        } catch (PulsarClientException e) {
            throw new ConnectorRuntimeException(String.format("Failed to unsubscribe the topic:%s with exception: %s", str, e.getMessage()));
        }
    }

    public void updateOffset(List<CloudEvent> list, AbstractContext abstractContext) {
    }

    public void registerEventListener(EventListener eventListener) {
        this.eventListener = eventListener;
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public boolean isClosed() {
        return !isStarted();
    }

    public void shutdown() {
        this.started.compareAndSet(true, false);
        try {
            this.consumer.close();
            this.pulsarClient.close();
        } catch (PulsarClientException e) {
            throw new ConnectorRuntimeException(String.format("Failed to close the pulsar client with exception: %s", e.getMessage()));
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 215203150:
                if (implMethodName.equals("lambda$subscribe$90054c54$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/eventmesh/connector/pulsar/consumer/PulsarConsumerImpl") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/eventmesh/api/EventMeshAsyncConsumeContext;Ljava/lang/String;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    PulsarConsumerImpl pulsarConsumerImpl = (PulsarConsumerImpl) serializedLambda.getCapturedArg(0);
                    EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) serializedLambda.getCapturedArg(1);
                    String str = (String) serializedLambda.getCapturedArg(2);
                    return (consumer, message) -> {
                        this.eventListener.consume(EventFormatProvider.getInstance().resolveFormat("application/cloudevents+json").deserialize(message.getData()), eventMeshAsyncConsumeContext);
                        try {
                            consumer.acknowledge(message);
                        } catch (PulsarClientException e) {
                            throw new ConnectorRuntimeException(String.format("Failed to unsubscribe the topic:%s with exception: %s", str, e.getMessage()));
                        } catch (EventDeserializationException e2) {
                            log.warn("The Message isn't json format, with exception:{}", e2.getMessage());
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
