package org.apache.eventmesh.connector.pulsar.client;

import com.google.common.base.Preconditions;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.connector.pulsar.config.ClientConfiguration;
import org.apache.eventmesh.connector.pulsar.utils.CloudEventUtils;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/pulsar/client/PulsarClientWrapper.class */
public class PulsarClientWrapper {
    private static final Logger log = LoggerFactory.getLogger(PulsarClientWrapper.class);
    private ClientConfiguration config;
    private PulsarClient pulsarClient;
    private Map<String, Producer<byte[]>> producerMap = new HashMap();

    public PulsarClientWrapper(ClientConfiguration clientConfiguration) {
        this.config = clientConfiguration;
        try {
            ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(clientConfiguration.getServiceAddr());
            if (clientConfiguration.getAuthPlugin() != null) {
                Preconditions.checkNotNull(clientConfiguration.getAuthParams(), "Authentication Enabled in pulsar cluster, Please set authParams in pulsar-client.properties");
                serviceUrl.authentication(clientConfiguration.getAuthPlugin(), clientConfiguration.getAuthParams());
            }
            this.pulsarClient = serviceUrl.build();
        } catch (PulsarClientException e) {
            throw new ConnectorRuntimeException(String.format("Failed to connect pulsar cluster %s with exception: %s", clientConfiguration.getServiceAddr(), e.getMessage()));
        }
    }

    private Producer<byte[]> createProducer(String str) {
        try {
            return this.pulsarClient.newProducer().topic(str).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true).create();
        } catch (PulsarClientException e) {
            throw new ConnectorRuntimeException(String.format("Failed to create pulsar producer for %s with exception: %s", str, e.getMessage()));
        }
    }

    public void publish(CloudEvent cloudEvent, SendCallback sendCallback) {
        String subject = cloudEvent.getSubject();
        try {
            this.producerMap.computeIfAbsent(subject, str -> {
                return createProducer(subject);
            }).sendAsync(EventFormatProvider.getInstance().resolveFormat("application/cloudevents+json").serialize(cloudEvent)).thenAccept(messageId -> {
                sendCallback.onSuccess(CloudEventUtils.convertSendResult(cloudEvent));
            });
        } catch (Exception e) {
            log.error("Failed to publish cloudEvent for {} with exception: {}", cloudEvent.getSubject(), e.getMessage());
        }
    }

    public void shutdown() throws PulsarClientException {
        this.pulsarClient.close();
        Iterator<Map.Entry<String, Producer<byte[]>>> it = this.producerMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().close();
        }
        this.producerMap.clear();
    }
}
