package org.apache.streampipes.manager.runtime;

import java.util.HashMap;
import java.util.Map;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.apache.streampipes.messaging.mqtt.MqttConsumer;
import org.apache.streampipes.messaging.nats.NatsConsumer;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.NatsTransportProtocol;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/streampipes-pipeline-management-0.91.0.jar:org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.class */
public enum PipelineElementRuntimeInfoFetcher {
    INSTANCE;

    private static final int FETCH_INTERVAL_MS = 300;
    Logger logger = LoggerFactory.getLogger((Class<?>) PipelineElementRuntimeInfoFetcher.class);
    private final Map<String, SpDataFormatConverter> converterMap = new HashMap();
    private Environment env = Environments.getEnvironment();

    PipelineElementRuntimeInfoFetcher() {
    }

    public String getCurrentData(SpDataStream spDataStream) throws SpRuntimeException {
        String outputTopic = getOutputTopic(spDataStream);
        TransportProtocol transportProtocol = spDataStream.getEventGrounding().getTransportProtocol();
        if (this.env.getSpDebug().getValueOrDefault().booleanValue()) {
            transportProtocol.setBrokerHostname("localhost");
        }
        if (!this.converterMap.containsKey(outputTopic)) {
            this.converterMap.put(outputTopic, new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
        }
        SpDataFormatConverter spDataFormatConverter = this.converterMap.get(outputTopic);
        return spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol ? getLatestEventFromKafka((KafkaTransportProtocol) transportProtocol, spDataFormatConverter, outputTopic) : spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol ? getLatestEventFromJms((JmsTransportProtocol) transportProtocol, spDataFormatConverter) : spDataStream.getEventGrounding().getTransportProtocol() instanceof MqttTransportProtocol ? getLatestEventFromMqtt((MqttTransportProtocol) transportProtocol, spDataFormatConverter) : getLatestEventFromNats((NatsTransportProtocol) transportProtocol, spDataFormatConverter);
    }

    private TransportFormat getTransportFormat(SpDataStream spDataStream) {
        return spDataStream.getEventGrounding().getTransportFormats().get(0);
    }

    private String getOutputTopic(SpDataStream spDataStream) {
        return spDataStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private void waitForEvent(String[] strArr) {
        long j = 0;
        while (strArr[0] == null && j < 6000) {
            try {
                Thread.sleep(300L);
                j += 300;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private String getLatestEventFromJms(JmsTransportProtocol jmsTransportProtocol, SpDataFormatConverter spDataFormatConverter) throws SpRuntimeException {
        String[] strArr = {null};
        ActiveMQConsumer activeMQConsumer = new ActiveMQConsumer();
        activeMQConsumer.connect2(jmsTransportProtocol, bArr -> {
            strArr[0] = spDataFormatConverter.convert(bArr);
            activeMQConsumer.disconnect();
        });
        waitForEvent(strArr);
        return strArr[0];
    }

    private String getLatestEventFromMqtt(MqttTransportProtocol mqttTransportProtocol, SpDataFormatConverter spDataFormatConverter) throws SpRuntimeException {
        String[] strArr = {null};
        MqttConsumer mqttConsumer = new MqttConsumer();
        mqttConsumer.connect2(mqttTransportProtocol, bArr -> {
            strArr[0] = spDataFormatConverter.convert(bArr);
            mqttConsumer.disconnect();
        });
        waitForEvent(strArr);
        return strArr[0];
    }

    private String getLatestEventFromNats(NatsTransportProtocol natsTransportProtocol, SpDataFormatConverter spDataFormatConverter) throws SpRuntimeException {
        String[] strArr = {null};
        NatsConsumer natsConsumer = new NatsConsumer();
        natsConsumer.connect2(natsTransportProtocol, bArr -> {
            strArr[0] = spDataFormatConverter.convert(bArr);
            natsConsumer.disconnect();
        });
        waitForEvent(strArr);
        return strArr[0];
    }

    private String getLatestEventFromKafka(KafkaTransportProtocol kafkaTransportProtocol, SpDataFormatConverter spDataFormatConverter, String str) throws SpRuntimeException {
        String[] strArr = {null};
        if (getEnvironment().getSpDebug().getValueOrDefault().booleanValue()) {
            kafkaTransportProtocol.setKafkaPort(9094);
        }
        SpKafkaConsumer spKafkaConsumer = new SpKafkaConsumer(kafkaTransportProtocol, str, bArr -> {
            strArr[0] = spDataFormatConverter.convert(bArr);
        });
        new Thread(spKafkaConsumer).start();
        waitForEvent(strArr);
        spKafkaConsumer.disconnect();
        return strArr[0];
    }

    private Environment getEnvironment() {
        return Environments.getEnvironment();
    }
}
