package org.apache.streampipes.sinks.brokers.jvm.pulsar;

import com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.standalone.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;

/* loaded from: input_file:org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarPublisherSink.class */
public class PulsarPublisherSink extends StreamPipesDataSink {
    public static final String TOPIC_KEY = "topic";
    public static final String PULSAR_HOST_KEY = "pulsar-host";
    public static final String PULSAR_PORT_KEY = "pulsar-port";
    private static final String PulsarScheme = "pulsar://";
    private static final String Colon = ":";
    private final ClientBuilder clientBuilder;
    private Producer<byte[]> producer;
    private PulsarClient pulsarClient;
    private SpDataFormatDefinition spDataFormatDefinition;
    private PulsarParameters params;

    public PulsarPublisherSink() {
        this.clientBuilder = PulsarClient.builder();
    }

    @VisibleForTesting
    public PulsarPublisherSink(ClientBuilder clientBuilder) {
        this.clientBuilder = clientBuilder;
    }

    /* renamed from: declareModel, reason: merged with bridge method [inline-methods] */
    public DataSinkDescription m5declareModel() {
        return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.pulsar").category(new DataSinkType[]{DataSinkType.MESSAGING}).withLocales(new Locales[]{Locales.EN}).withAssets(new String[]{"documentation.md", "icon.png"}).requiredStream(StreamRequirementsBuilder.create().requiredProperty(EpRequirements.anyProperty()).build()).requiredTextParameter(Labels.withId(PULSAR_HOST_KEY)).requiredIntegerParameter(Labels.withId(PULSAR_PORT_KEY), 6650).requiredTextParameter(Labels.withId("topic")).build();
    }

    public void onInvocation(SinkParams sinkParams, EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
        this.params = new PulsarParameters(sinkParams);
        this.spDataFormatDefinition = new JsonDataFormatDefinition();
        try {
            this.pulsarClient = this.clientBuilder.serviceUrl(makePulsarUrl(this.params.getPulsarHost(), this.params.getPulsarPort())).build();
            this.producer = this.pulsarClient.newProducer().topic(this.params.getTopic()).create();
        } catch (PulsarClientException e) {
            throw new SpRuntimeException(e);
        }
    }

    public void onEvent(Event event) throws SpRuntimeException {
        try {
            this.producer.send(this.spDataFormatDefinition.fromMap(event.getRaw()));
        } catch (PulsarClientException e) {
            throw new SpRuntimeException(e);
        }
    }

    public void onDetach() throws SpRuntimeException {
        try {
            this.pulsarClient.close();
        } catch (PulsarClientException e) {
            throw new SpRuntimeException(e);
        }
    }

    private String makePulsarUrl(String str, Integer num) {
        return "pulsar://" + str + ":" + num;
    }
}
