package org.apache.streampipes.messaging.mqtt;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

/* loaded from: input_file:org/apache/streampipes/messaging/mqtt/MqttConsumer.class */
public class MqttConsumer extends AbstractMqttConnector implements EventConsumer<MqttTransportProtocol> {
    public void connect(MqttTransportProtocol mqttTransportProtocol, InternalEventProcessor<byte[]> internalEventProcessor) throws SpRuntimeException {
        try {
            createBrokerConnection(mqttTransportProtocol);
            this.connection.subscribe(new Topic[]{new Topic(mqttTransportProtocol.getTopicDefinition().getActualTopicName(), QoS.AT_LEAST_ONCE)});
            while (this.connected.booleanValue()) {
                Message receive = this.connection.receive();
                internalEventProcessor.onEvent(receive.getPayload());
                receive.ack();
            }
        } catch (Exception e) {
            throw new SpRuntimeException(e);
        }
    }

    public void disconnect() throws SpRuntimeException {
        try {
            try {
                this.connection.disconnect();
                this.connected = false;
            } catch (Exception e) {
                throw new SpRuntimeException(e);
            }
        } catch (Throwable th) {
            this.connected = false;
            throw th;
        }
    }

    public Boolean isConnected() {
        return this.connected;
    }

    public /* bridge */ /* synthetic */ void connect(TransportProtocol transportProtocol, InternalEventProcessor internalEventProcessor) throws SpRuntimeException {
        connect((MqttTransportProtocol) transportProtocol, (InternalEventProcessor<byte[]>) internalEventProcessor);
    }
}
