package org.apache.streampipes.client.live;

import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.streampipes.client.api.live.EventProcessor;
import org.apache.streampipes.client.api.live.IBrokerConfigOverride;
import org.apache.streampipes.client.api.live.ISubscription;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.dataformat.SpDataFormatManager;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.SpProtocolDefinition;
import org.apache.streampipes.messaging.SpProtocolManager;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.runtime.EventFactory;

/* loaded from: input_file:BOOT-INF/lib/streampipes-client-0.93.0.jar:org/apache/streampipes/client/live/SubscriptionManager.class */
public class SubscriptionManager {
    private final EventGrounding grounding;
    private final EventProcessor callback;
    private IBrokerConfigOverride brokerConfigOverride;
    private boolean overrideSettings;

    public SubscriptionManager(EventGrounding eventGrounding, EventProcessor eventProcessor) {
        this.overrideSettings = false;
        this.grounding = eventGrounding;
        this.callback = eventProcessor;
    }

    public SubscriptionManager(IBrokerConfigOverride iBrokerConfigOverride, EventGrounding eventGrounding, EventProcessor eventProcessor) {
        this(eventGrounding, eventProcessor);
        this.brokerConfigOverride = iBrokerConfigOverride;
        this.overrideSettings = true;
    }

    public ISubscription subscribe() {
        Optional<SpDataFormatDefinition> findDefinition = SpDataFormatManager.INSTANCE.findDefinition(this.grounding.getTransportFormats().get(0));
        try {
            SpProtocolDefinition<TransportProtocol> findProtocol = findProtocol(getTransportProtocol());
            if (!findDefinition.isPresent()) {
                throw new SpRuntimeException("No converter found for data format - did you add a format factory (client.registerDataFormat)?");
            }
            SpDataFormatDefinition spDataFormatDefinition = findDefinition.get();
            TransportProtocol transportProtocol = getTransportProtocol();
            if (this.overrideSettings) {
                if (transportProtocol instanceof KafkaTransportProtocol) {
                    this.brokerConfigOverride.overrideKafkaHostname((KafkaTransportProtocol) transportProtocol);
                }
                this.brokerConfigOverride.overrideHostname(transportProtocol);
                this.brokerConfigOverride.overridePort(transportProtocol);
            }
            EventConsumer consumer = findProtocol.getConsumer(transportProtocol);
            consumer.connect(bArr -> {
                try {
                    this.callback.onEvent(EventFactory.fromMap(spDataFormatDefinition.toMap(bArr)));
                } catch (SpRuntimeException e) {
                    e.printStackTrace();
                }
            });
            return new Subscription(consumer);
        } catch (NoSuchElementException e) {
            throw new SpRuntimeException("Could not find an implementation for messaging protocol " + this.grounding.getTransportProtocol().getClass().getCanonicalName() + "- please add the corresponding module (streampipes-messaging-*) to your project dependencies.");
        }
    }

    private SpProtocolDefinition<TransportProtocol> findProtocol(TransportProtocol transportProtocol) {
        return (SpProtocolDefinition) SpProtocolManager.INSTANCE.findDefinition(transportProtocol).orElseThrow();
    }

    private TransportProtocol getTransportProtocol() {
        return this.grounding.getTransportProtocol();
    }
}
