package org.apache.activemq.transport.mqtt;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.LongSequenceGenerator;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBACK;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:activemq-core-5.7.0.jar:org/apache/activemq/transport/mqtt/MQTTProtocolConverter.class */
public class MQTTProtocolConverter {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
    private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
    private final MQTTTransport mqttTransport;
    private int lastCommandId;
    private CONNECT connect;
    private String clientId;
    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
    private final SessionId sessionId = new SessionId(this.connectionId, -1);
    private final ProducerId producerId = new ProducerId(this.sessionId, 1);
    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
    private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<>();
    private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache();
    private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache();
    private final Map<Short, MessageAck> consumerAcks = new LRUCache();
    private final Map<Short, PUBREC> publisherRecs = new LRUCache();
    private final Object commnadIdMutex = new Object();
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private ConnectionInfo connectionInfo = new ConnectionInfo();
    private final String QOS_PROPERTY_NAME = "QoSPropertyName";

    public MQTTProtocolConverter(MQTTTransport mQTTTransport, BrokerContext brokerContext) {
        this.mqttTransport = mQTTTransport;
    }

    int generateCommandId() {
        int i;
        synchronized (this.commnadIdMutex) {
            i = this.lastCommandId;
            this.lastCommandId = i + 1;
        }
        return i;
    }

    void sendToActiveMQ(Command command, ResponseHandler responseHandler) {
        command.setCommandId(generateCommandId());
        if (responseHandler != null) {
            command.setResponseRequired(true);
            this.resposeHandlers.put(Integer.valueOf(command.getCommandId()), responseHandler);
        }
        this.mqttTransport.sendToActiveMQ(command);
    }

    void sendToMQTT(MQTTFrame mQTTFrame) {
        try {
            this.mqttTransport.sendToMQTT(mQTTFrame);
        } catch (IOException e) {
            LOG.warn("Failed to send frame " + mQTTFrame, (Throwable) e);
        }
    }

    public void onMQTTCommand(MQTTFrame mQTTFrame) throws IOException, JMSException {
        switch (mQTTFrame.messageType()) {
            case 1:
                onMQTTConnect(new CONNECT().decode(mQTTFrame));
                LOG.debug("MQTT Client " + getClientId() + " connected.");
                return;
            case 2:
            case 9:
            case 11:
            case 13:
            default:
                handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + ((int) mQTTFrame.messageType()), true), mQTTFrame);
                return;
            case 3:
                onMQTTPublish(new PUBLISH().decode(mQTTFrame));
                return;
            case 4:
                onMQTTPubAck(new PUBACK().decode(mQTTFrame));
                return;
            case 5:
                onMQTTPubRec(new PUBREC().decode(mQTTFrame));
                return;
            case 6:
                onMQTTPubRel(new PUBREL().decode(mQTTFrame));
                return;
            case 7:
                onMQTTPubComp(new PUBCOMP().decode(mQTTFrame));
                return;
            case 8:
                onSubscribe(new SUBSCRIBE().decode(mQTTFrame));
                return;
            case 10:
                onUnSubscribe(new UNSUBSCRIBE().decode(mQTTFrame));
                return;
            case 12:
                this.mqttTransport.sendToMQTT(PING_RESP_FRAME);
                LOG.debug("Sent Ping Response to " + getClientId());
                return;
            case 14:
                LOG.debug("MQTT Client " + getClientId() + " disconnecting");
                stopTransport();
                return;
        }
    }

    void onMQTTConnect(CONNECT connect) throws MQTTProtocolException {
        if (this.connected.get()) {
            throw new MQTTProtocolException("All ready connected.");
        }
        this.connect = connect;
        String uTF8Buffer = connect.clientId() != null ? connect.clientId().toString() : "";
        String uTF8Buffer2 = connect.userName() != null ? connect.userName().toString() : "";
        String uTF8Buffer3 = connect.password() != null ? connect.password().toString() : "";
        configureInactivityMonitor(connect.keepAlive());
        this.connectionInfo.setConnectionId(this.connectionId);
        if (uTF8Buffer == null || uTF8Buffer.isEmpty()) {
            this.connectionInfo.setClientId("" + this.connectionInfo.getConnectionId().toString());
        } else {
            this.connectionInfo.setClientId(uTF8Buffer);
        }
        this.connectionInfo.setResponseRequired(true);
        this.connectionInfo.setUserName(uTF8Buffer2);
        this.connectionInfo.setPassword(uTF8Buffer3);
        this.connectionInfo.setTransportContext(this.mqttTransport.getPeerCertificates());
        sendToActiveMQ(this.connectionInfo, new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.MQTTProtocolConverter.1
            @Override // org.apache.activemq.transport.mqtt.ResponseHandler
            public void onResponse(MQTTProtocolConverter mQTTProtocolConverter, Response response) throws IOException {
                if (!response.isException()) {
                    MQTTProtocolConverter.this.sendToActiveMQ(new SessionInfo(MQTTProtocolConverter.this.sessionId), null);
                    MQTTProtocolConverter.this.sendToActiveMQ(new ProducerInfo(MQTTProtocolConverter.this.producerId), new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.MQTTProtocolConverter.1.1
                        @Override // org.apache.activemq.transport.mqtt.ResponseHandler
                        public void onResponse(MQTTProtocolConverter mQTTProtocolConverter2, Response response2) throws IOException {
                            if (response2.isException()) {
                                Throwable exception = ((ExceptionResponse) response2).getException();
                                CONNACK connack = new CONNACK();
                                connack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                                MQTTProtocolConverter.this.getMQTTTransport().sendToMQTT(connack.encode());
                                MQTTProtocolConverter.this.getMQTTTransport().onException(IOExceptionSupport.create(exception));
                            }
                            CONNACK connack2 = new CONNACK();
                            connack2.code(CONNACK.Code.CONNECTION_ACCEPTED);
                            MQTTProtocolConverter.this.connected.set(true);
                            MQTTProtocolConverter.this.getMQTTTransport().sendToMQTT(connack2.encode());
                        }
                    });
                    return;
                }
                Throwable exception = ((ExceptionResponse) response).getException();
                CONNACK connack = new CONNACK();
                connack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                MQTTProtocolConverter.this.getMQTTTransport().sendToMQTT(connack.encode());
                MQTTProtocolConverter.this.getMQTTTransport().onException(IOExceptionSupport.create(exception));
            }
        });
    }

    void onSubscribe(SUBSCRIBE subscribe) throws MQTTProtocolException {
        checkConnected();
        new SUBACK();
        Topic[] topicArr = subscribe.topics();
        if (topicArr == null) {
            LOG.warn("No topics defined for Subscription " + subscribe);
            return;
        }
        byte[] bArr = new byte[topicArr.length];
        for (int i = 0; i < topicArr.length; i++) {
            bArr[i] = (byte) onSubscribe(subscribe, topicArr[i]).ordinal();
        }
        SUBACK suback = new SUBACK();
        suback.messageId(subscribe.messageId());
        suback.grantedQos(bArr);
        try {
            getMQTTTransport().sendToMQTT(suback.encode());
        } catch (IOException e) {
            LOG.warn("Couldn't send SUBACK for " + subscribe, (Throwable) e);
        }
    }

    QoS onSubscribe(SUBSCRIBE subscribe, Topic topic) throws MQTTProtocolException {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
        if (activeMQTopic == null) {
            throw new MQTTProtocolException("Invalid Destination.");
        }
        ConsumerId consumerId = new ConsumerId(this.sessionId, this.consumerIdGenerator.getNextSequenceId());
        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
        consumerInfo.setDestination(activeMQTopic);
        consumerInfo.setPrefetchSize(1000);
        consumerInfo.setDispatchAsync(true);
        if (!this.connect.cleanSession() && this.connect.clientId() != null) {
            consumerInfo.setSubscriptionName(this.connect.clientId().toString());
        }
        MQTTSubscription mQTTSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
        this.subscriptionsByConsumerId.put(consumerId, mQTTSubscription);
        this.mqttSubscriptionByTopic.put(topic.name(), mQTTSubscription);
        sendToActiveMQ(consumerInfo, null);
        return topic.qos();
    }

    void onUnSubscribe(UNSUBSCRIBE unsubscribe) {
        UTF8Buffer[] uTF8BufferArr = unsubscribe.topics();
        if (uTF8BufferArr != null) {
            for (UTF8Buffer uTF8Buffer : uTF8BufferArr) {
                onUnSubscribe(uTF8Buffer);
            }
        }
        UNSUBACK unsuback = new UNSUBACK();
        unsuback.messageId(unsubscribe.messageId());
        sendToMQTT(unsuback.encode());
    }

    void onUnSubscribe(UTF8Buffer uTF8Buffer) {
        MQTTSubscription remove = this.mqttSubscriptionByTopic.remove(uTF8Buffer);
        if (remove != null) {
            ConsumerInfo consumerInfo = remove.getConsumerInfo();
            if (consumerInfo != null) {
                this.subscriptionsByConsumerId.remove(consumerInfo.getConsumerId());
            }
            sendToActiveMQ(consumerInfo.createRemoveCommand(), null);
        }
    }

    public void onActiveMQCommand(Command command) throws Exception {
        if (command.isResponse()) {
            Response response = (Response) command;
            ResponseHandler remove = this.resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
            if (remove != null) {
                remove.onResponse(this, response);
                return;
            } else {
                if (response.isException()) {
                    handleException(((ExceptionResponse) response).getException(), null);
                    return;
                }
                return;
            }
        }
        if (!command.isMessageDispatch()) {
            if (command.getDataStructureType() == 16) {
                handleException(((ConnectionError) command).getException(), null);
                return;
            } else {
                if (command.isBrokerInfo()) {
                    return;
                }
                LOG.debug("Do not know how to process ActiveMQ Command " + command);
                return;
            }
        }
        MessageDispatch messageDispatch = (MessageDispatch) command;
        MQTTSubscription mQTTSubscription = this.subscriptionsByConsumerId.get(messageDispatch.getConsumerId());
        if (mQTTSubscription != null) {
            MessageAck createMessageAck = mQTTSubscription.createMessageAck(messageDispatch);
            PUBLISH createPublish = mQTTSubscription.createPublish((ActiveMQMessage) messageDispatch.getMessage());
            if (createMessageAck != null && mQTTSubscription.expectAck()) {
                synchronized (this.consumerAcks) {
                    this.consumerAcks.put(Short.valueOf(createPublish.messageId()), createMessageAck);
                }
            }
            getMQTTTransport().sendToMQTT(createPublish.encode());
            if (createMessageAck == null || mQTTSubscription.expectAck()) {
                return;
            }
            getMQTTTransport().sendToActiveMQ(createMessageAck);
        }
    }

    void onMQTTPublish(PUBLISH publish) throws IOException, JMSException {
        checkConnected();
        ActiveMQMessage convertMessage = convertMessage(publish);
        convertMessage.setProducerId(this.producerId);
        convertMessage.onSend();
        sendToActiveMQ(convertMessage, createResponseHandler(publish));
    }

    void onMQTTPubAck(PUBACK puback) {
        MessageAck remove;
        short messageId = puback.messageId();
        synchronized (this.consumerAcks) {
            remove = this.consumerAcks.remove(Short.valueOf(messageId));
        }
        if (remove != null) {
            getMQTTTransport().sendToActiveMQ(remove);
        }
    }

    void onMQTTPubRec(PUBREC pubrec) {
        PUBREL pubrel = new PUBREL();
        pubrel.messageId(pubrec.messageId());
        sendToMQTT(pubrel.encode());
    }

    void onMQTTPubRel(PUBREL pubrel) {
        PUBREC remove;
        synchronized (this.publisherRecs) {
            remove = this.publisherRecs.remove(Short.valueOf(pubrel.messageId()));
        }
        if (remove == null) {
            LOG.warn("Unknown PUBREL: " + ((int) pubrel.messageId()) + " received");
        }
        PUBCOMP pubcomp = new PUBCOMP();
        pubcomp.messageId(pubrel.messageId());
        sendToMQTT(pubcomp.encode());
    }

    void onMQTTPubComp(PUBCOMP pubcomp) {
        MessageAck remove;
        short messageId = pubcomp.messageId();
        synchronized (this.consumerAcks) {
            remove = this.consumerAcks.remove(Short.valueOf(messageId));
        }
        if (remove != null) {
            getMQTTTransport().sendToActiveMQ(remove);
        }
    }

    ActiveMQMessage convertMessage(PUBLISH publish) throws JMSException {
        ActiveMQTopic activeMQTopic;
        ActiveMQBytesMessage activeMQBytesMessage = new ActiveMQBytesMessage();
        activeMQBytesMessage.setProducerId(this.producerId);
        activeMQBytesMessage.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
        activeMQBytesMessage.setTimestamp(System.currentTimeMillis());
        activeMQBytesMessage.setPriority((byte) 4);
        activeMQBytesMessage.setPersistent(publish.qos() != QoS.AT_MOST_ONCE);
        activeMQBytesMessage.setIntProperty("QoSPropertyName", publish.qos().ordinal());
        synchronized (this.activeMQTopicMap) {
            activeMQTopic = this.activeMQTopicMap.get(publish.topicName());
            if (activeMQTopic == null) {
                activeMQTopic = new ActiveMQTopic(publish.topicName().toString().replaceAll("/", "."));
                this.activeMQTopicMap.put(publish.topicName(), activeMQTopic);
            }
        }
        activeMQBytesMessage.setJMSDestination(activeMQTopic);
        activeMQBytesMessage.writeBytes(publish.payload().data, publish.payload().offset, publish.payload().length);
        return activeMQBytesMessage;
    }

    public PUBLISH convertMessage(ActiveMQMessage activeMQMessage) throws IOException, JMSException, DataFormatException {
        QoS qoS;
        UTF8Buffer uTF8Buffer;
        PUBLISH publish = new PUBLISH();
        publish.messageId((short) activeMQMessage.getMessageId().getProducerSequenceId());
        if (activeMQMessage.propertyExists("QoSPropertyName")) {
            qoS = QoS.values()[activeMQMessage.getIntProperty("QoSPropertyName")];
        } else {
            qoS = activeMQMessage.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
        }
        publish.qos(qoS);
        synchronized (this.mqttTopicMap) {
            uTF8Buffer = this.mqttTopicMap.get(activeMQMessage.getJMSDestination());
            if (uTF8Buffer == null) {
                uTF8Buffer = new UTF8Buffer(activeMQMessage.getDestination().getPhysicalName().replace('.', '/'));
                this.mqttTopicMap.put(activeMQMessage.getJMSDestination(), uTF8Buffer);
            }
        }
        publish.topicName(uTF8Buffer);
        if (activeMQMessage.getDataStructureType() == 28) {
            ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) activeMQMessage.copy();
            activeMQTextMessage.setReadOnlyBody(true);
            String text = activeMQTextMessage.getText();
            if (text != null) {
                publish.payload(new Buffer(text.getBytes("UTF-8")));
            }
        } else if (activeMQMessage.getDataStructureType() == 24) {
            ActiveMQBytesMessage activeMQBytesMessage = (ActiveMQBytesMessage) activeMQMessage.copy();
            activeMQBytesMessage.setReadOnlyBody(true);
            byte[] bArr = new byte[(int) activeMQBytesMessage.getBodyLength()];
            activeMQBytesMessage.readBytes(bArr);
            publish.payload(new Buffer(bArr));
        } else if (activeMQMessage.getDataStructureType() == 25) {
            ActiveMQMapMessage activeMQMapMessage = (ActiveMQMapMessage) activeMQMessage.copy();
            activeMQMapMessage.setReadOnlyBody(true);
            Map<String, Object> contentMap = activeMQMapMessage.getContentMap();
            if (contentMap != null) {
                publish.payload(new Buffer(contentMap.toString().getBytes("UTF-8")));
            }
        } else {
            ByteSequence content = activeMQMessage.getContent();
            if (content != null && content.getLength() > 0) {
                if (activeMQMessage.isCompressed()) {
                    Inflater inflater = new Inflater();
                    inflater.setInput(content.data, content.offset, content.length);
                    byte[] bArr2 = new byte[4096];
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    while (true) {
                        int inflate = inflater.inflate(bArr2);
                        if (inflate == 0) {
                            break;
                        }
                        byteArrayOutputStream.write(bArr2, 0, inflate);
                    }
                    content = byteArrayOutputStream.toByteSequence();
                }
                publish.payload(new Buffer(content.data, content.offset, content.length));
            }
        }
        return publish;
    }

    public MQTTTransport getMQTTTransport() {
        return this.mqttTransport;
    }

    public void onTransportError() {
        if (this.connect == null || this.connect.willTopic() == null || this.connect.willMessage() == null) {
            return;
        }
        try {
            PUBLISH publish = new PUBLISH();
            publish.topicName(this.connect.willTopic());
            publish.qos(this.connect.willQos());
            publish.payload(this.connect.willMessage());
            ActiveMQMessage convertMessage = convertMessage(publish);
            convertMessage.setProducerId(this.producerId);
            convertMessage.onSend();
            sendToActiveMQ(convertMessage, null);
        } catch (Exception e) {
            LOG.warn("Failed to publish Will Message " + this.connect.willMessage());
        }
    }

    void configureInactivityMonitor(short s) {
        try {
            int i = s * 1000;
            MQTTInactivityMonitor inactivityMonitor = getMQTTTransport().getInactivityMonitor();
            inactivityMonitor.setProtocolConverter(this);
            inactivityMonitor.setReadCheckTime(i);
            inactivityMonitor.setInitialDelayTime(i);
            inactivityMonitor.startMonitorThread();
        } catch (Exception e) {
            LOG.warn("Failed to start MQTT InactivityMonitor ", (Throwable) e);
        }
        LOG.debug(getClientId() + " MQTT Connection using heart beat of  " + ((int) s) + " secs");
    }

    void handleException(Throwable th, MQTTFrame mQTTFrame) {
        LOG.warn("Exception occurred processing: \n" + mQTTFrame + ": " + th.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Exception detail", th);
        }
        try {
            getMQTTTransport().stop();
        } catch (Throwable th2) {
            LOG.error("Failed to stop MQTTT Transport ", th2);
        }
    }

    void checkConnected() throws MQTTProtocolException {
        if (!this.connected.get()) {
            throw new MQTTProtocolException("Not connected.");
        }
    }

    private String getClientId() {
        if (this.clientId != null) {
            this.clientId = "";
        } else if (this.connect != null && this.connect.clientId() != null) {
            this.clientId = this.connect.clientId().toString();
        }
        return this.clientId;
    }

    private void stopTransport() {
        try {
            getMQTTTransport().stop();
        } catch (Throwable th) {
            LOG.debug("Failed to stop MQTT transport ", th);
        }
    }

    ResponseHandler createResponseHandler(final PUBLISH publish) {
        if (publish == null) {
            return null;
        }
        switch (publish.qos()) {
            case AT_LEAST_ONCE:
                return new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.MQTTProtocolConverter.2
                    @Override // org.apache.activemq.transport.mqtt.ResponseHandler
                    public void onResponse(MQTTProtocolConverter mQTTProtocolConverter, Response response) throws IOException {
                        if (response.isException()) {
                            MQTTProtocolConverter.LOG.warn("Failed to send MQTT Publish: ", publish, ((ExceptionResponse) response).getException());
                            return;
                        }
                        PUBACK puback = new PUBACK();
                        puback.messageId(publish.messageId());
                        mQTTProtocolConverter.getMQTTTransport().sendToMQTT(puback.encode());
                    }
                };
            case EXACTLY_ONCE:
                return new ResponseHandler() { // from class: org.apache.activemq.transport.mqtt.MQTTProtocolConverter.3
                    @Override // org.apache.activemq.transport.mqtt.ResponseHandler
                    public void onResponse(MQTTProtocolConverter mQTTProtocolConverter, Response response) throws IOException {
                        if (response.isException()) {
                            MQTTProtocolConverter.LOG.warn("Failed to send MQTT Publish: ", publish, ((ExceptionResponse) response).getException());
                            return;
                        }
                        PUBREC pubrec = new PUBREC();
                        pubrec.messageId(publish.messageId());
                        synchronized (MQTTProtocolConverter.this.publisherRecs) {
                            MQTTProtocolConverter.this.publisherRecs.put(Short.valueOf(publish.messageId()), pubrec);
                        }
                        mQTTProtocolConverter.getMQTTTransport().sendToMQTT(pubrec.encode());
                    }
                };
            case AT_MOST_ONCE:
            default:
                return null;
        }
    }

    private String convertMQTTToActiveMQ(String str) {
        return str.replace('#', '>').replace('+', '*').replace('/', '.');
    }
}
