package org.apache.streampipes.sinks.brokers.jvm.nats;

import io.nats.client.Connection;
import io.nats.client.Nats;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.messaging.nats.NatsUtils;
import org.apache.streampipes.model.nats.NatsConfig;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;

/* loaded from: input_file:org/apache/streampipes/sinks/brokers/jvm/nats/NatsPublisher.class */
public class NatsPublisher implements EventSink<NatsParameters> {
    private String subject;
    private Connection natsConnection;
    private JsonDataFormatDefinition dataFormatDefinition = new JsonDataFormatDefinition();
    private static Logger LOG;

    public void onInvocation(NatsParameters natsParameters, EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
        LOG = natsParameters.getGraph().getLogger(NatsPublisher.class);
        NatsConfig natsConfig = natsParameters.getNatsConfig();
        this.subject = natsConfig.getSubject();
        try {
            this.natsConnection = Nats.connect(NatsUtils.makeNatsOptions(natsConfig));
        } catch (Exception e) {
            LOG.error("Error when connecting to the Nats broker on " + natsConfig.getNatsUrls() + " . " + e.toString());
        }
    }

    public void onEvent(Event event) {
        try {
            this.natsConnection.publish(this.subject, this.dataFormatDefinition.fromMap(event.getRaw()));
        } catch (SpRuntimeException e) {
            LOG.error("Could not publish events to Nats broker. " + e.toString());
        }
    }

    public void onDetach() throws SpRuntimeException {
        try {
            this.natsConnection.flush(Duration.ofMillis(50L));
            this.natsConnection.close();
        } catch (InterruptedException | TimeoutException e) {
            LOG.error("Error when disconnecting with Nats broker. " + e.toString());
        }
    }
}
