package org.apache.streampipes.client.live;

import java.util.Optional;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.SpDataFormatDefinition;
import org.apache.streampipes.dataformat.SpDataFormatFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.runtime.EventFactory;

/* loaded from: input_file:BOOT-INF/lib/streampipes-client-0.91.0.jar:org/apache/streampipes/client/live/SubscriptionManager.class */
public class SubscriptionManager {
    private final EventGrounding grounding;
    private final EventProcessor callback;
    private final StreamPipesClientConfig clientConfig;
    private KafkaConfig kafkaConfig;
    private boolean overrideKafkaSettings;

    public SubscriptionManager(StreamPipesClientConfig streamPipesClientConfig, EventGrounding eventGrounding, EventProcessor eventProcessor) {
        this.overrideKafkaSettings = false;
        this.grounding = eventGrounding;
        this.callback = eventProcessor;
        this.clientConfig = streamPipesClientConfig;
    }

    public SubscriptionManager(StreamPipesClientConfig streamPipesClientConfig, KafkaConfig kafkaConfig, EventGrounding eventGrounding, EventProcessor eventProcessor) {
        this(streamPipesClientConfig, eventGrounding, eventProcessor);
        this.kafkaConfig = kafkaConfig;
        this.overrideKafkaSettings = true;
    }

    public SpKafkaConsumer subscribe() {
        Optional<SpDataFormatFactory> findFirst = this.clientConfig.getRegisteredDataFormats().stream().filter(spDataFormatFactory -> {
            return this.grounding.getTransportFormats().get(0).getRdfType().stream().anyMatch(uri -> {
                return uri.toString().equals(spDataFormatFactory.getTransportFormatRdfUri());
            });
        }).findFirst();
        if (!findFirst.isPresent()) {
            throw new SpRuntimeException("No converter found for data format - did you add a format factory (client.registerDataFormat)?");
        }
        SpDataFormatDefinition createInstance = findFirst.get().createInstance();
        SpKafkaConsumer spKafkaConsumer = new SpKafkaConsumer(this.overrideKafkaSettings ? overrideHostname(getKafkaProtocol()) : getKafkaProtocol(), getOutputTopic(), bArr -> {
            try {
                this.callback.onEvent(EventFactory.fromMap(createInstance.toMap(bArr)));
            } catch (SpRuntimeException e) {
                e.printStackTrace();
            }
        });
        new Thread(spKafkaConsumer).start();
        return spKafkaConsumer;
    }

    private KafkaTransportProtocol overrideHostname(KafkaTransportProtocol kafkaTransportProtocol) {
        kafkaTransportProtocol.setBrokerHostname(this.kafkaConfig.getKafkaHost());
        kafkaTransportProtocol.setKafkaPort(this.kafkaConfig.getKafkaPort().intValue());
        return kafkaTransportProtocol;
    }

    private KafkaTransportProtocol getKafkaProtocol() {
        return (KafkaTransportProtocol) this.grounding.getTransportProtocol();
    }

    private String getOutputTopic() {
        return this.grounding.getTransportProtocol().getTopicDefinition().getActualTopicName();
    }
}
