package org.apache.streampipes.sinks.brokers.jvm.jms;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;

/* loaded from: input_file:org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.class */
public class JmsPublisher implements EventSink<JmsParameters> {
    private ActiveMQPublisher publisher;
    private JsonDataFormatDefinition jsonDataFormatDefinition = new JsonDataFormatDefinition();

    public void onInvocation(JmsParameters jmsParameters, EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
        this.publisher = new ActiveMQPublisher();
        this.publisher.connect(new JmsTransportProtocol(jmsParameters.getJmsHost(), jmsParameters.getJmsPort().intValue(), jmsParameters.getTopic()));
        if (!this.publisher.isConnected()) {
            throw new SpRuntimeException("Could not connect to JMS server " + jmsParameters.getJmsHost() + " on Port: " + jmsParameters.getJmsPort() + " to topic: " + jmsParameters.getTopic());
        }
    }

    public void onEvent(Event event) {
        try {
            this.publisher.publish(this.jsonDataFormatDefinition.fromMap(event.getRaw()));
        } catch (SpRuntimeException e) {
            e.printStackTrace();
        }
    }

    public void onDetach() throws SpRuntimeException {
        this.publisher.disconnect();
    }
}
