package org.apache.storm.mqtt.spout;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.storm.mqtt.MqttMessageMapper;
import org.apache.storm.mqtt.common.MqttOptions;
import org.apache.storm.mqtt.common.MqttUtils;
import org.apache.storm.mqtt.common.SslUtils;
import org.apache.storm.mqtt.ssl.KeyStoreLoader;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/mqtt/spout/MqttSpout.class */
public class MqttSpout implements IRichSpout, Listener {
    private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class);
    protected transient SpoutOutputCollector collector;
    protected transient TopologyContext context;
    protected transient LinkedBlockingQueue<AckableMessage> incoming;
    protected transient HashMap<Long, AckableMessage> pending;
    protected MqttMessageMapper type;
    protected MqttOptions options;
    protected KeyStoreLoader keyStoreLoader;
    private String topologyName;
    private CallbackConnection connection;
    private transient Map<String, Object> conf;
    private boolean mqttConnected;
    private boolean mqttConnectFailed;
    private Long sequence;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/mqtt/spout/MqttSpout$ConnectCallback.class */
    public class ConnectCallback implements Callback<Void> {
        private ConnectCallback() {
        }

        public void onSuccess(Void r4) {
            MqttSpout.LOG.info("MQTT Connected. Subscribing to topic...");
            MqttSpout.this.mqttConnected = true;
        }

        public void onFailure(Throwable th) {
            MqttSpout.LOG.info("MQTT Connection failed.");
            MqttSpout.this.mqttConnectFailed = true;
        }
    }

    /* loaded from: input_file:org/apache/storm/mqtt/spout/MqttSpout$DisconnectCallback.class */
    private class DisconnectCallback implements Callback<Void> {
        private DisconnectCallback() {
        }

        public void onSuccess(Void r4) {
            MqttSpout.LOG.info("MQTT Disconnect successful.");
        }

        public void onFailure(Throwable th) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/mqtt/spout/MqttSpout$SubscribeCallback.class */
    public class SubscribeCallback implements Callback<byte[]> {
        private SubscribeCallback() {
        }

        public void onSuccess(byte[] bArr) {
            MqttSpout.LOG.info("Subscripton sucessful.");
        }

        public void onFailure(Throwable th) {
            MqttSpout.LOG.error("MQTT Subscripton failed.", th);
            throw new RuntimeException("MQTT Subscribe failed.", th);
        }
    }

    protected MqttSpout() {
        this.mqttConnected = false;
        this.mqttConnectFailed = false;
        this.sequence = Long.MIN_VALUE;
    }

    public MqttSpout(MqttMessageMapper mqttMessageMapper, MqttOptions mqttOptions) {
        this(mqttMessageMapper, mqttOptions, null);
    }

    public MqttSpout(MqttMessageMapper mqttMessageMapper, MqttOptions mqttOptions, KeyStoreLoader keyStoreLoader) {
        this.mqttConnected = false;
        this.mqttConnectFailed = false;
        this.sequence = Long.MIN_VALUE;
        this.type = mqttMessageMapper;
        this.options = mqttOptions;
        this.keyStoreLoader = keyStoreLoader;
        SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader);
    }

    private Long nextId() {
        Long l = this.sequence;
        this.sequence = Long.valueOf(this.sequence.longValue() + 1);
        if (this.sequence.longValue() == Long.MAX_VALUE) {
            this.sequence = Long.MIN_VALUE;
        }
        return this.sequence;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.type.outputFields());
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.topologyName = (String) map.get("topology.name");
        this.collector = spoutOutputCollector;
        this.context = topologyContext;
        this.conf = map;
        this.incoming = new LinkedBlockingQueue<>();
        this.pending = new HashMap<>();
        try {
            connectMqtt();
        } catch (Exception e) {
            this.collector.reportError(e);
            throw new RuntimeException("MQTT Connection failed.", e);
        }
    }

    private void connectMqtt() throws Exception {
        this.connection = MqttUtils.configureClient(this.options, this.topologyName + "-" + this.context.getThisComponentId() + "-" + this.context.getThisTaskId(), this.keyStoreLoader).callbackConnection();
        this.connection.listener(this);
        this.connection.connect(new ConnectCallback());
        while (!this.mqttConnected && !this.mqttConnectFailed) {
            LOG.info("Waiting for connection...");
            Thread.sleep(500L);
        }
        if (this.mqttConnected) {
            List<String> topics = this.options.getTopics();
            Topic[] topicArr = new Topic[topics.size()];
            QoS qosFromInt = MqttUtils.qosFromInt(this.options.getQos());
            for (int i = 0; i < topics.size(); i++) {
                topicArr[i] = new Topic(topics.get(i), qosFromInt);
            }
            this.connection.subscribe(topicArr, new SubscribeCallback());
        }
    }

    public void close() {
        this.connection.disconnect(new DisconnectCallback());
    }

    public void activate() {
    }

    public void deactivate() {
    }

    public void nextTuple() {
        AckableMessage poll = this.incoming.poll();
        if (poll == null) {
            Thread.yield();
            return;
        }
        Long nextId = nextId();
        this.collector.emit(this.type.toValues(poll.getMessage()), nextId);
        this.pending.put(nextId, poll);
    }

    public void ack(Object obj) {
        this.connection.getDispatchQueue().execute(this.pending.remove(obj).ack());
    }

    public void fail(Object obj) {
        try {
            this.incoming.put(this.pending.remove(obj));
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while re-queueing message.", e);
        }
    }

    public void onConnected() {
    }

    public void onDisconnected() {
    }

    public void onPublish(UTF8Buffer uTF8Buffer, Buffer buffer, Runnable runnable) {
        LOG.debug("Received message: topic={}, payload={}", uTF8Buffer.toString(), new String(buffer.toByteArray()));
        try {
            this.incoming.put(new AckableMessage(uTF8Buffer.toString(), buffer.toByteArray(), runnable));
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while queueing an MQTT message.");
        }
    }

    public void onFailure(Throwable th) {
        LOG.error("MQTT Connection Failure.", th);
        this.connection.disconnect(new DisconnectCallback());
        throw new RuntimeException("MQTT Connection failure.", th);
    }
}
