package org.apache.streampipes.export.utils;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.configuration.MessagingSettings;
import org.apache.streampipes.model.configuration.SpProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.NatsTransportProtocol;
import org.apache.streampipes.model.grounding.PulsarTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.storage.management.StorageDispatcher;

/* loaded from: input_file:BOOT-INF/lib/streampipes-data-export-0.93.0.jar:org/apache/streampipes/export/utils/EventGroundingProcessor.class */
public class EventGroundingProcessor {
    private final MessagingSettings messagingSettings = StorageDispatcher.INSTANCE.getNoSqlStore().getSpCoreConfigurationStorage().get().getMessagingSettings();
    private final SpProtocol configuredProtocol = this.messagingSettings.getPrioritizedProtocols().get(0);

    public TransportProtocol applyOverride(TransportProtocol transportProtocol) {
        TransportProtocol configuredTransportProtocol = getConfiguredTransportProtocol();
        configuredTransportProtocol.setTopicDefinition(transportProtocol.getTopicDefinition());
        return configuredTransportProtocol;
    }

    private TransportProtocol getConfiguredTransportProtocol() {
        return initializeProtocol(this.configuredProtocol);
    }

    private TransportProtocol initializeProtocol(SpProtocol spProtocol) {
        if (isProtocol(spProtocol, KafkaTransportProtocol.class)) {
            return makeKafkaProtocol();
        }
        if (isProtocol(spProtocol, MqttTransportProtocol.class)) {
            return makeMqttProtocol();
        }
        if (isProtocol(spProtocol, NatsTransportProtocol.class)) {
            return makeNatsProtocol();
        }
        if (isProtocol(spProtocol, PulsarTransportProtocol.class)) {
            return makePulsarProtocol();
        }
        throw new SpRuntimeException("Not recognized protocol: " + spProtocol.getName());
    }

    private boolean isProtocol(SpProtocol spProtocol, Class<?> cls) {
        return spProtocol.getProtocolClass().equals(cls.getCanonicalName());
    }

    private KafkaTransportProtocol makeKafkaProtocol() {
        KafkaTransportProtocol kafkaTransportProtocol = new KafkaTransportProtocol();
        kafkaTransportProtocol.setBrokerHostname(this.messagingSettings.getKafkaHost());
        kafkaTransportProtocol.setKafkaPort(this.messagingSettings.getKafkaPort());
        return kafkaTransportProtocol;
    }

    private MqttTransportProtocol makeMqttProtocol() {
        MqttTransportProtocol mqttTransportProtocol = new MqttTransportProtocol();
        mqttTransportProtocol.setBrokerHostname(this.messagingSettings.getMqttHost());
        mqttTransportProtocol.setPort(this.messagingSettings.getMqttPort());
        return mqttTransportProtocol;
    }

    private NatsTransportProtocol makeNatsProtocol() {
        NatsTransportProtocol natsTransportProtocol = new NatsTransportProtocol();
        natsTransportProtocol.setBrokerHostname(this.messagingSettings.getNatsHost());
        natsTransportProtocol.setPort(this.messagingSettings.getNatsPort());
        return natsTransportProtocol;
    }

    private PulsarTransportProtocol makePulsarProtocol() {
        PulsarTransportProtocol pulsarTransportProtocol = new PulsarTransportProtocol();
        pulsarTransportProtocol.setBrokerHostname(this.messagingSettings.getNatsHost());
        return pulsarTransportProtocol;
    }
}
