package org.apache.streampipes.extensions.connectors.mqtt.shared;

import org.apache.streampipes.messaging.InternalEventProcessor;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

/* loaded from: input_file:org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.class */
public class MqttConsumer implements Runnable {
    private final InternalEventProcessor<byte[]> consumer;
    private boolean running;
    private int maxElementsToReceive;
    private int messageCount;
    private final MqttConfig mqttConfig;

    public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> internalEventProcessor) {
        this.maxElementsToReceive = -1;
        this.messageCount = 0;
        this.mqttConfig = mqttConfig;
        this.consumer = internalEventProcessor;
    }

    public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> internalEventProcessor, int i) {
        this(mqttConfig, internalEventProcessor);
        this.maxElementsToReceive = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.running = true;
        MQTT mqtt = new MQTT();
        try {
            mqtt.setHost(this.mqttConfig.getUrl());
            mqtt.setConnectAttemptsMax(1L);
            if (this.mqttConfig.getAuthenticated().booleanValue()) {
                mqtt.setUserName(this.mqttConfig.getUsername());
                mqtt.setPassword(this.mqttConfig.getPassword());
            }
            BlockingConnection blockingConnection = mqtt.blockingConnection();
            blockingConnection.connect();
            blockingConnection.subscribe(new Topic[]{new Topic(this.mqttConfig.getTopic(), QoS.AT_LEAST_ONCE)});
            while (this.running && (this.maxElementsToReceive == -1 || this.messageCount <= this.maxElementsToReceive)) {
                Message receive = blockingConnection.receive();
                this.consumer.onEvent(receive.getPayload());
                receive.ack();
                this.messageCount++;
            }
            blockingConnection.disconnect();
        } catch (Exception e) {
            throw new RuntimeException("Error when receiving data from MQTT", e);
        }
    }

    public void close() {
        this.running = false;
    }

    public Integer getMessageCount() {
        return Integer.valueOf(this.messageCount);
    }
}
