package org.apache.streampipes.manager.runtime;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.SpProtocolDefinition;
import org.apache.streampipes.messaging.SpProtocolManager;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.TransportFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.class */
public enum PipelineElementRuntimeInfoFetcher {
    INSTANCE;

    private static final Logger LOG = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
    private static final int FETCH_INTERVAL_MS = 300;
    private final Map<String, SpDataFormatConverter> converterMap = new HashMap();
    private final Environment env = Environments.getEnvironment();

    PipelineElementRuntimeInfoFetcher() {
    }

    public String getCurrentData(SpDataStream spDataStream) throws SpRuntimeException {
        String outputTopic = getOutputTopic(spDataStream);
        KafkaTransportProtocol transportProtocol = spDataStream.getEventGrounding().getTransportProtocol();
        if (((Boolean) this.env.getSpDebug().getValueOrDefault()).booleanValue()) {
            transportProtocol.setBrokerHostname("localhost");
            if (transportProtocol instanceof KafkaTransportProtocol) {
                transportProtocol.setKafkaPort(9094);
            }
        }
        if (!this.converterMap.containsKey(outputTopic)) {
            this.converterMap.put(outputTopic, new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
        }
        SpDataFormatConverter spDataFormatConverter = this.converterMap.get(outputTopic);
        Optional findDefinition = SpProtocolManager.INSTANCE.findDefinition(spDataStream.getEventGrounding().getTransportProtocol());
        if (findDefinition.isPresent()) {
            return getLatestEvent(((SpProtocolDefinition) findDefinition.get()).getConsumer(transportProtocol), spDataFormatConverter);
        }
        LOG.error("Error while fetching data for preview - protocol {} not found - did you register the protocol? ", transportProtocol.getClass().getCanonicalName());
        throw new SpRuntimeException("Protocol not found");
    }

    private TransportFormat getTransportFormat(SpDataStream spDataStream) {
        return (TransportFormat) spDataStream.getEventGrounding().getTransportFormats().get(0);
    }

    private String getOutputTopic(SpDataStream spDataStream) {
        return spDataStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private void waitForEvent(String[] strArr) {
        long j = 0;
        while (strArr[0] == null && j < 6000) {
            try {
                TimeUnit.MILLISECONDS.sleep(300L);
                j += 300;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private String getLatestEvent(EventConsumer eventConsumer, SpDataFormatConverter spDataFormatConverter) {
        String[] strArr = {null};
        eventConsumer.connect(bArr -> {
            strArr[0] = spDataFormatConverter.convert(bArr);
            eventConsumer.disconnect();
        });
        waitForEvent(strArr);
        return strArr[0];
    }
}
