package org.apache.streampipes.manager.matching;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.streampipes.manager.util.TopicGenerator;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.configuration.MessagingSettings;
import org.apache.streampipes.model.configuration.SpProtocol;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.NatsTransportProtocol;
import org.apache.streampipes.model.grounding.PulsarTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.storage.management.StorageDispatcher;

/* loaded from: input_file:org/apache/streampipes/manager/matching/ProtocolSelector.class */
public class ProtocolSelector extends GroundingSelector {
    private final String outputTopic;
    private final List<SpProtocol> prioritizedProtocols;
    private final MessagingSettings messagingSettings;

    public ProtocolSelector(NamedStreamPipesEntity namedStreamPipesEntity, Set<InvocableStreamPipesEntity> set) {
        super(namedStreamPipesEntity, set);
        this.outputTopic = TopicGenerator.generateRandomTopic();
        this.messagingSettings = StorageDispatcher.INSTANCE.getNoSqlStore().getSpCoreConfigurationStorage().get().getMessagingSettings();
        this.prioritizedProtocols = this.messagingSettings.getPrioritizedProtocols();
    }

    public TransportProtocol getPreferredProtocol() {
        if (this.source instanceof SpDataStream) {
            return this.source.getEventGrounding().getTransportProtocol();
        }
        for (SpProtocol spProtocol : this.prioritizedProtocols) {
            if (spProtocol.getProtocolClass().equals(KafkaTransportProtocol.class.getCanonicalName()) && supportsProtocol(KafkaTransportProtocol.class)) {
                return kafkaTopic();
            }
            if (spProtocol.getProtocolClass().equals(JmsTransportProtocol.class.getCanonicalName()) && supportsProtocol(JmsTransportProtocol.class)) {
                return jmsTopic();
            }
            if (spProtocol.getProtocolClass().equals(MqttTransportProtocol.class.getCanonicalName()) && supportsProtocol(MqttTransportProtocol.class)) {
                return mqttTopic();
            }
            if (spProtocol.getProtocolClass().equals(NatsTransportProtocol.class.getCanonicalName()) && supportsProtocol(NatsTransportProtocol.class)) {
                return natsTopic();
            }
            if (spProtocol.getProtocolClass().equals(PulsarTransportProtocol.class.getCanonicalName()) && supportsProtocol(PulsarTransportProtocol.class)) {
                return new PulsarTransportProtocol(this.messagingSettings.getPulsarUrl(), new SimpleTopicDefinition(this.outputTopic));
            }
        }
        return kafkaTopic();
    }

    private TransportProtocol mqttTopic() {
        return new MqttTransportProtocol(this.messagingSettings.getMqttHost(), this.messagingSettings.getMqttPort(), this.outputTopic);
    }

    private TransportProtocol jmsTopic() {
        return new JmsTransportProtocol(this.messagingSettings.getJmsHost(), this.messagingSettings.getJmsPort(), this.outputTopic);
    }

    private TransportProtocol natsTopic() {
        return new NatsTransportProtocol(this.messagingSettings.getNatsHost(), this.messagingSettings.getNatsPort(), this.outputTopic);
    }

    private TransportProtocol kafkaTopic() {
        return new KafkaTransportProtocol(this.messagingSettings.getKafkaHost(), this.messagingSettings.getKafkaPort(), this.outputTopic, this.messagingSettings.getZookeeperHost(), this.messagingSettings.getZookeeperPort());
    }

    public <T extends TransportProtocol> boolean supportsProtocol(Class<T> cls) {
        return buildInvocables().stream().allMatch(invocableStreamPipesEntity -> {
            Stream stream = invocableStreamPipesEntity.getSupportedGrounding().getTransportProtocols().stream();
            Objects.requireNonNull(cls);
            return stream.anyMatch((v1) -> {
                return r1.isInstance(v1);
            });
        });
    }
}
