package org.apache.streampipes.wrapper.standalone.routing;

import java.util.Map;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.monitoring.SpMonitoringManager;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.extensions.management.monitoring.ExtensionsLogger;
import org.apache.streampipes.messaging.EventProducer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventConverter;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/wrapper/standalone/routing/StandaloneSpOutputCollector.class */
public class StandaloneSpOutputCollector<T extends TransportProtocol> extends StandaloneSpCollector<T, InternalEventProcessor<Map<String, Object>>> implements SpOutputCollector {
    private static final Logger LOG = LoggerFactory.getLogger(StandaloneSpOutputCollector.class);
    private final EventProducer producer;
    private final String resourceId;
    private final ExtensionsLogger extensionsLogger;

    public StandaloneSpOutputCollector(T t, TransportFormat transportFormat, String str) throws SpRuntimeException {
        super(t, transportFormat);
        this.producer = this.protocolDefinition.getProducer(t);
        this.resourceId = str;
        this.extensionsLogger = new ExtensionsLogger(str);
    }

    public void collect(Event event) {
        try {
            this.producer.publish(this.dataFormatDefinition.fromMap(new EventConverter(event).toMap()));
            SpMonitoringManager.INSTANCE.increaseOutCounter(this.resourceId, System.currentTimeMillis());
        } catch (SpRuntimeException e) {
            this.extensionsLogger.error(e);
            LOG.error("Could not publish event", e);
        }
    }

    public void connect() throws SpRuntimeException {
        if (this.producer.isConnected()) {
            return;
        }
        this.producer.connect();
    }

    public void disconnect() throws SpRuntimeException {
        if (this.producer.isConnected()) {
            this.producer.disconnect();
            ProtocolManager.removeOutputCollector(this.transportProtocol);
        }
    }
}
