package com.datatorrent.contrib.mqtt;

import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/mqtt/AbstractMqttInputOperator.class */
public abstract class AbstractMqttInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractMqttInputOperator.class);
    private static final int DEFAULT_BLAST_SIZE = 1000;
    private static final int DEFAULT_BUFFER_SIZE = 1048576;
    private int tupleBlast = 1000;
    private int bufferSize = DEFAULT_BUFFER_SIZE;
    protected Map<String, QoS> topicMap = new HashMap();
    protected MqttClientConfig mqttClientConfig;
    protected transient MQTT client;
    protected transient ArrayBlockingQueue<Message> holdingBuffer;
    protected transient BlockingConnection connection;
    protected transient Thread thread;

    public abstract void emitTuple(Message message);

    public MqttClientConfig getMqttClientConfig() {
        return this.mqttClientConfig;
    }

    public void setMqttClientConfig(MqttClientConfig mqttClientConfig) {
        this.mqttClientConfig = mqttClientConfig;
    }

    public void setTupleBlast(int i) {
        this.tupleBlast = i;
    }

    public void addSubscribeTopic(String str, QoS qoS) {
        this.topicMap.put(str, qoS);
    }

    public void removeSubscribeTopic(String str) {
        this.topicMap.remove(str);
    }

    public void emitTuples() {
        Message poll;
        int i = this.tupleBlast;
        if (i > this.holdingBuffer.size()) {
            i = this.holdingBuffer.size();
        }
        int i2 = i;
        while (true) {
            int i3 = i2;
            i2--;
            if (i3 <= 0 || (poll = this.holdingBuffer.poll()) == null) {
                return;
            } else {
                emitTuple(poll);
            }
        }
    }

    public void beginWindow(long j) {
    }

    public void endWindow() {
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.holdingBuffer = new ArrayBlockingQueue<>(this.bufferSize);
    }

    public void teardown() {
    }

    private void initializeConnection() throws Exception {
        this.connection = this.client.blockingConnection();
        this.connection.connect();
        if (this.topicMap.isEmpty()) {
            return;
        }
        Topic[] topicArr = new Topic[this.topicMap.size()];
        int i = 0;
        for (Map.Entry<String, QoS> entry : this.topicMap.entrySet()) {
            int i2 = i;
            i++;
            topicArr[i2] = new Topic(entry.getKey(), entry.getValue());
        }
        this.connection.subscribe(topicArr);
    }

    public void activate(Context.OperatorContext operatorContext) {
        try {
            this.client = new MQTT();
            if (this.mqttClientConfig.getClientId() != null) {
                this.client.setClientId(this.mqttClientConfig.getClientId());
            }
            this.client.setCleanSession(this.mqttClientConfig.isCleanSession());
            this.client.setConnectAttemptsMax(this.mqttClientConfig.getConnectAttemptsMax());
            this.client.setHost(this.mqttClientConfig.getHost(), this.mqttClientConfig.getPort());
            this.client.setKeepAlive(this.mqttClientConfig.getKeepAliveInterval());
            if (this.mqttClientConfig.getPassword() != null) {
                this.client.setPassword(this.mqttClientConfig.getPassword());
            }
            if (this.mqttClientConfig.getUserName() != null) {
                this.client.setUserName(this.mqttClientConfig.getUserName());
            }
            if (this.mqttClientConfig.getWillMessage() != null) {
                this.client.setWillMessage(this.mqttClientConfig.getWillMessage());
                this.client.setWillQos(this.mqttClientConfig.getWillQos());
                this.client.setWillRetain(this.mqttClientConfig.isWillRetain());
            }
            if (this.mqttClientConfig.getWillTopic() != null) {
                this.client.setWillTopic(this.mqttClientConfig.getWillTopic());
            }
            initializeConnection();
            this.thread = new Thread(new Runnable() { // from class: com.datatorrent.contrib.mqtt.AbstractMqttInputOperator.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            AbstractMqttInputOperator.this.holdingBuffer.add(AbstractMqttInputOperator.this.connection.receive());
                        } catch (Exception e) {
                            AbstractMqttInputOperator.LOG.error("Trouble receiving", e);
                        }
                    }
                }
            });
            this.thread.start();
        } catch (Exception e) {
            LOG.error("Caught exception during activation: ", e);
            throw new RuntimeException(e);
        }
    }

    public void deactivate() {
        try {
            try {
                this.thread.interrupt();
                this.thread.join();
                try {
                    this.connection.disconnect();
                } catch (Exception e) {
                    LOG.error("Caught exception during disconnect", e);
                }
            } catch (InterruptedException e2) {
                LOG.error("interrupted");
                try {
                    this.connection.disconnect();
                } catch (Exception e3) {
                    LOG.error("Caught exception during disconnect", e3);
                }
            }
        } catch (Throwable th) {
            try {
                this.connection.disconnect();
            } catch (Exception e4) {
                LOG.error("Caught exception during disconnect", e4);
            }
            throw th;
        }
    }
}
