/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streampipes.extensions.connectors.pulsar.sink;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.extensions.connectors.pulsar.sink.PulsarParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;

public class PulsarPublisherSink
implements IStreamPipesDataSink {
    public static final String TOPIC_KEY = "topic";
    public static final String PULSAR_HOST_KEY = "pulsar-host";
    public static final String PULSAR_PORT_KEY = "pulsar-port";
    private static final String PulsarScheme = "pulsar://";
    private static final String Colon = ":";
    private final ClientBuilder clientBuilder;
    private Producer<byte[]> producer;
    private PulsarClient pulsarClient;
    private SpDataFormatDefinition spDataFormatDefinition;
    private PulsarParameters params;

    public PulsarPublisherSink() {
        this.clientBuilder = PulsarClient.builder();
    }

    @VisibleForTesting
    public PulsarPublisherSink(ClientBuilder pulsarClientBuilder) {
        this.clientBuilder = pulsarClientBuilder;
    }

    public IDataSinkConfiguration declareConfig() {
        return DataSinkConfiguration.create(PulsarPublisherSink::new, (DataSinkDescription)((DataSinkDescription)((DataSinkBuilder)((DataSinkBuilder)((DataSinkBuilder)((DataSinkBuilder)((DataSinkBuilder)((DataSinkBuilder)DataSinkBuilder.create((String)"org.apache.streampipes.sinks.brokers.jvm.pulsar").category(new DataSinkType[]{DataSinkType.MESSAGING}).withLocales(new Locales[]{Locales.EN})).withAssets(new String[]{"documentation.md", "icon.png"})).requiredStream(StreamRequirementsBuilder.create().requiredProperty((EventProperty)EpRequirements.anyProperty()).build())).requiredTextParameter(Labels.withId((String)PULSAR_HOST_KEY))).requiredIntegerParameter(Labels.withId((String)PULSAR_PORT_KEY), Integer.valueOf(6650))).requiredTextParameter(Labels.withId((String)TOPIC_KEY))).build()));
    }

    public void onPipelineStarted(IDataSinkParameters parameters, EventSinkRuntimeContext runtimeContext) {
        this.params = new PulsarParameters(parameters);
        this.spDataFormatDefinition = new JsonDataFormatDefinition();
        try {
            this.pulsarClient = this.clientBuilder.serviceUrl(this.makePulsarUrl(this.params.getPulsarHost(), this.params.getPulsarPort())).build();
            this.producer = this.pulsarClient.newProducer().topic(this.params.getTopic()).create();
        }
        catch (PulsarClientException e) {
            throw new SpRuntimeException((Throwable)e);
        }
    }

    public void onEvent(Event event) throws SpRuntimeException {
        Map rawMap = event.getRaw();
        byte[] jsonMessage = this.spDataFormatDefinition.fromMap(rawMap);
        try {
            this.producer.send((Object)jsonMessage);
        }
        catch (PulsarClientException e) {
            throw new SpRuntimeException((Throwable)e);
        }
    }

    public void onPipelineStopped() {
        try {
            this.pulsarClient.close();
        }
        catch (PulsarClientException e) {
            throw new SpRuntimeException((Throwable)e);
        }
    }

    private String makePulsarUrl(String hostname, Integer port) {
        return PulsarScheme + hostname + Colon + port;
    }
}

