package org.apache.activemq.artemis.integration.vertx;

import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ConnectorService;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.eventbus.impl.PingMessage;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.PlatformLocator;
import org.vertx.java.platform.PlatformManager;
import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.class */
public class IncomingVertxEventHandler implements ConnectorService {
    private final String connectorName;
    private final String queueName;
    private final int port;
    private final String host;
    private final int quorumSize;
    private final String haGroup;
    private final String vertxAddress;
    private EventBus eventBus;
    private PlatformManager platformManager;
    private EventHandler handler;
    private final StorageManager storageManager;
    private final PostOffice postOffice;
    private boolean isStarted = false;

    /* loaded from: input_file:org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler$EventHandler.class */
    private class EventHandler implements Handler<Message<?>> {
        private EventHandler() {
        }

        public void handle(Message<?> message) {
            ServerMessageImpl serverMessageImpl = new ServerMessageImpl(IncomingVertxEventHandler.this.storageManager.generateID(), 50);
            serverMessageImpl.setAddress(new SimpleString(IncomingVertxEventHandler.this.queueName));
            serverMessageImpl.setDurable(true);
            serverMessageImpl.encodeMessageIDToBuffer();
            String replyAddress = message.replyAddress();
            if (replyAddress != null) {
                serverMessageImpl.putStringProperty(VertxConstants.VERTX_MESSAGE_REPLYADDRESS, replyAddress);
            }
            int messageType = getMessageType(message);
            serverMessageImpl.putIntProperty(VertxConstants.VERTX_MESSAGE_TYPE, messageType);
            manualEncodeVertxMessageBody(serverMessageImpl.getBodyBuffer(), message.body(), messageType);
            try {
                IncomingVertxEventHandler.this.postOffice.route(serverMessageImpl, (QueueCreator) null, false);
            } catch (Exception e) {
                ActiveMQVertxLogger.LOGGER.error("failed to route msg " + serverMessageImpl, e);
            }
        }

        private void manualEncodeVertxMessageBody(ActiveMQBuffer activeMQBuffer, Object obj, int i) {
            switch (i) {
                case VertxConstants.TYPE_PING /* 0 */:
                case VertxConstants.TYPE_STRING /* 11 */:
                    activeMQBuffer.writeString((String) obj);
                    return;
                case VertxConstants.TYPE_BUFFER /* 1 */:
                    activeMQBuffer.writeInt(((Buffer) obj).length());
                    activeMQBuffer.writeBytes(((Buffer) obj).getBytes());
                    return;
                case VertxConstants.TYPE_BOOLEAN /* 2 */:
                    activeMQBuffer.writeBoolean(((Boolean) obj).booleanValue());
                    return;
                case VertxConstants.TYPE_BYTEARRAY /* 3 */:
                    byte[] bArr = (byte[]) obj;
                    activeMQBuffer.writeInt(bArr.length);
                    activeMQBuffer.writeBytes(bArr);
                    return;
                case VertxConstants.TYPE_BYTE /* 4 */:
                    activeMQBuffer.writeByte(((Byte) obj).byteValue());
                    return;
                case VertxConstants.TYPE_CHARACTER /* 5 */:
                    activeMQBuffer.writeChar(((Character) obj).charValue());
                    return;
                case VertxConstants.TYPE_DOUBLE /* 6 */:
                    activeMQBuffer.writeDouble(((Double) obj).doubleValue());
                    return;
                case VertxConstants.TYPE_FLOAT /* 7 */:
                    activeMQBuffer.writeFloat(((Float) obj).floatValue());
                    return;
                case VertxConstants.TYPE_INT /* 8 */:
                    activeMQBuffer.writeInt(((Integer) obj).intValue());
                    return;
                case VertxConstants.TYPE_LONG /* 9 */:
                    activeMQBuffer.writeLong(((Long) obj).longValue());
                    return;
                case VertxConstants.TYPE_SHORT /* 10 */:
                    activeMQBuffer.writeShort(((Short) obj).shortValue());
                    return;
                case VertxConstants.TYPE_JSON_OBJECT /* 12 */:
                    activeMQBuffer.writeString(((JsonObject) obj).encode());
                    return;
                case VertxConstants.TYPE_JSON_ARRAY /* 13 */:
                    activeMQBuffer.writeString(((JsonArray) obj).encode());
                    return;
                case VertxConstants.TYPE_REPLY_FAILURE /* 100 */:
                    ReplyException replyException = (ReplyException) obj;
                    activeMQBuffer.writeInt(replyException.failureType().toInt());
                    activeMQBuffer.writeInt(replyException.failureCode());
                    activeMQBuffer.writeString(replyException.getMessage());
                    return;
                default:
                    throw new IllegalArgumentException("Invalid body type: " + i);
            }
        }

        private int getMessageType(Message<?> message) {
            Object body = message.body();
            if (message instanceof PingMessage) {
                return 0;
            }
            if (body instanceof Buffer) {
                return 1;
            }
            if (body instanceof Boolean) {
                return 2;
            }
            if (body instanceof byte[]) {
                return 3;
            }
            if (body instanceof Byte) {
                return 4;
            }
            if (body instanceof Character) {
                return 5;
            }
            if (body instanceof Double) {
                return 6;
            }
            if (body instanceof Float) {
                return 7;
            }
            if (body instanceof Integer) {
                return 8;
            }
            if (body instanceof Long) {
                return 9;
            }
            if (body instanceof Short) {
                return 10;
            }
            if (body instanceof String) {
                return 11;
            }
            if (body instanceof JsonArray) {
                return 13;
            }
            if (body instanceof JsonObject) {
                return 12;
            }
            if (body instanceof ReplyException) {
                return 100;
            }
            throw new IllegalArgumentException("Type not supported: " + message);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IncomingVertxEventHandler(String str, Map<String, Object> map, StorageManager storageManager, PostOffice postOffice) {
        this.connectorName = str;
        this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, (String) null, map);
        this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, map);
        this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost", map);
        this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE, -1, map);
        this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP, "activemq", map);
        this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS, "org.apache.activemq", map);
        this.storageManager = storageManager;
        this.postOffice = postOffice;
    }

    public void start() throws Exception {
        if (this.isStarted) {
            return;
        }
        System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
        if (this.quorumSize != -1) {
            this.platformManager = PlatformLocator.factory.createPlatformManager(this.port, this.host, this.quorumSize, this.haGroup);
        } else {
            this.platformManager = PlatformLocator.factory.createPlatformManager(this.port, this.host);
        }
        this.eventBus = this.platformManager.vertx().eventBus();
        if (this.postOffice.getBinding(new SimpleString(this.queueName)) == null) {
            throw new Exception(this.connectorName + ": queue " + this.queueName + " not found");
        }
        this.handler = new EventHandler();
        this.eventBus.registerHandler(this.vertxAddress, this.handler);
        this.isStarted = true;
        ActiveMQVertxLogger.LOGGER.debug(this.connectorName + ": started");
    }

    public void stop() throws Exception {
        if (this.isStarted) {
            this.eventBus.unregisterHandler(this.vertxAddress, this.handler);
            this.platformManager.stop();
            System.clearProperty("vertx.clusterManagerFactory");
            this.isStarted = false;
            ActiveMQVertxLogger.LOGGER.debug(this.connectorName + ": stopped");
        }
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public String getName() {
        return this.connectorName;
    }

    public String toString() {
        return "[IncomingVertxEventHandler(" + this.connectorName + "), queueName: " + this.queueName + " host: " + this.host + " port: " + this.port + " vertxAddress: " + this.vertxAddress + "]";
    }
}
