package org.apache.streampipes.manager.runtime;

import java.util.HashMap;
import java.util.Map;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.class */
public enum PipelineElementRuntimeInfoFetcher {
    INSTANCE;

    Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
    private Map<String, SpDataFormatConverter> converterMap = new HashMap();

    PipelineElementRuntimeInfoFetcher() {
    }

    public String getCurrentData(SpDataStream spDataStream) throws SpRuntimeException {
        return spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol ? getLatestEventFromKafka(spDataStream) : getLatestEventFromJms(spDataStream);
    }

    private String getLatestEventFromJms(SpDataStream spDataStream) throws SpRuntimeException {
        final String[] strArr = {null};
        final String outputTopic = getOutputTopic(spDataStream);
        if (!this.converterMap.containsKey(outputTopic)) {
            this.converterMap.put(outputTopic, new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
        }
        final ActiveMQConsumer activeMQConsumer = new ActiveMQConsumer();
        activeMQConsumer.connect(spDataStream.getEventGrounding().getTransportProtocol(), new InternalEventProcessor<byte[]>() { // from class: org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher.1
            public void onEvent(byte[] bArr) {
                try {
                    strArr[0] = ((SpDataFormatConverter) PipelineElementRuntimeInfoFetcher.this.converterMap.get(outputTopic)).convert(bArr);
                    activeMQConsumer.disconnect();
                } catch (SpRuntimeException e) {
                    e.printStackTrace();
                }
            }
        });
        while (strArr[0] == null) {
            try {
                Thread.sleep(300L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return strArr[0];
    }

    private TransportFormat getTransportFormat(SpDataStream spDataStream) {
        return (TransportFormat) spDataStream.getEventGrounding().getTransportFormats().get(0);
    }

    private String getOutputTopic(SpDataStream spDataStream) {
        return spDataStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private String getLatestEventFromKafka(SpDataStream spDataStream) throws SpRuntimeException {
        final String[] strArr = {null};
        final String outputTopic = getOutputTopic(spDataStream);
        KafkaTransportProtocol transportProtocol = spDataStream.getEventGrounding().getTransportProtocol();
        if ("true".equals(System.getenv("SP_DEBUG"))) {
            transportProtocol.setBrokerHostname("localhost");
            transportProtocol.setKafkaPort(9094);
        }
        if (!this.converterMap.containsKey(outputTopic)) {
            this.converterMap.put(outputTopic, new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
        }
        SpKafkaConsumer spKafkaConsumer = new SpKafkaConsumer(transportProtocol, outputTopic, new InternalEventProcessor<byte[]>() { // from class: org.apache.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher.2
            public void onEvent(byte[] bArr) {
                try {
                    strArr[0] = ((SpDataFormatConverter) PipelineElementRuntimeInfoFetcher.this.converterMap.get(outputTopic)).convert(bArr);
                } catch (SpRuntimeException e) {
                    e.printStackTrace();
                }
            }
        });
        new Thread((Runnable) spKafkaConsumer).start();
        long j = 0;
        while (strArr[0] == null && j < 6000) {
            try {
                Thread.sleep(300L);
                j += 300;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        spKafkaConsumer.disconnect();
        return strArr[0];
    }
}
