package org.apache.servicemix.jbi.nmr.flow.jms;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.jbi.JBIException;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.executors.Executor;
import org.apache.servicemix.jbi.event.ComponentAdapter;
import org.apache.servicemix.jbi.event.ComponentEvent;
import org.apache.servicemix.jbi.event.ComponentListener;
import org.apache.servicemix.jbi.event.EndpointAdapter;
import org.apache.servicemix.jbi.event.EndpointEvent;
import org.apache.servicemix.jbi.event.EndpointListener;
import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.nmr.Broker;
import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;

/* loaded from: input_file:WEB-INF/lib/servicemix-core-3.1.2.jar:org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.class */
public class JMSFlow extends AbstractFlow implements MessageListener {
    private static final String INBOUND_PREFIX = "org.apache.servicemix.jms.";
    private String userName;
    private String password;
    private ActiveMQConnectionFactory connectionFactory;
    private ActiveMQConnection connection;
    private MessageProducer queueProducer;
    private MessageProducer topicProducer;
    private Topic broadcastTopic;
    private Session broadcastSession;
    private MessageConsumer broadcastConsumer;
    private Session inboundSession;
    private MessageConsumer advisoryConsumer;
    private EndpointListener endpointListener;
    private ComponentListener componentListener;
    private Executor executor;
    private String jmsURL = "peer://org.apache.servicemix?persistent=false";
    private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
    private Set subscriberSet = new CopyOnWriteArraySet();
    private Map consumerMap = new ConcurrentHashMap();
    private AtomicBoolean started = new AtomicBoolean(false);

    @Override // org.apache.servicemix.jbi.management.MBeanInfoProvider
    public String getDescription() {
        return "jms";
    }

    public String getJmsURL() {
        return this.jmsURL;
    }

    public void setJmsURL(String str) {
        this.jmsURL = str;
    }

    public String getPassword() {
        return this.password;
    }

    public void setPassword(String str) {
        this.password = str;
    }

    public String getUserName() {
        return this.userName;
    }

    public void setUserName(String str) {
        this.userName = str;
    }

    public ActiveMQConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        this.connectionFactory = activeMQConnectionFactory;
    }

    public String getBroadcastDestinationName() {
        return this.broadcastDestinationName;
    }

    public void setBroadcastDestinationName(String str) {
        this.broadcastDestinationName = str;
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.Flow
    public boolean canHandle(MessageExchange messageExchange) {
        return !isTransacted(messageExchange);
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow, org.apache.servicemix.jbi.nmr.flow.Flow
    public void init(Broker broker) throws JBIException {
        this.log.debug(new StringBuffer().append(broker.getContainer().getName()).append(": Initializing jms flow").toString());
        super.init(broker);
        this.executor = broker.getContainer().getExecutorFactory().createExecutor("flow.jms");
        this.endpointListener = new EndpointAdapter(this) { // from class: org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow.1
            private final JMSFlow this$0;

            {
                this.this$0 = this;
            }

            @Override // org.apache.servicemix.jbi.event.EndpointAdapter, org.apache.servicemix.jbi.event.EndpointListener
            public void internalEndpointRegistered(EndpointEvent endpointEvent) {
                this.this$0.onInternalEndpointRegistered(endpointEvent, true);
            }

            @Override // org.apache.servicemix.jbi.event.EndpointAdapter, org.apache.servicemix.jbi.event.EndpointListener
            public void internalEndpointUnregistered(EndpointEvent endpointEvent) {
                this.this$0.onInternalEndpointUnregistered(endpointEvent, true);
            }
        };
        broker.getContainer().addListener(this.endpointListener);
        this.componentListener = new ComponentAdapter(this) { // from class: org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow.2
            private final JMSFlow this$0;

            {
                this.this$0 = this;
            }

            @Override // org.apache.servicemix.jbi.event.ComponentAdapter, org.apache.servicemix.jbi.event.ComponentListener
            public void componentStarted(ComponentEvent componentEvent) {
                this.this$0.onComponentStarted(componentEvent);
            }

            @Override // org.apache.servicemix.jbi.event.ComponentAdapter, org.apache.servicemix.jbi.event.ComponentListener
            public void componentStopped(ComponentEvent componentEvent) {
                this.this$0.onComponentStopped(componentEvent);
            }
        };
        broker.getContainer().addListener(this.componentListener);
        try {
            if (this.connectionFactory == null) {
                if (this.jmsURL != null) {
                    this.connectionFactory = new ActiveMQConnectionFactory(this.jmsURL);
                } else {
                    this.connectionFactory = new ActiveMQConnectionFactory();
                }
            }
            if (this.userName != null) {
                this.connection = (ActiveMQConnection) this.connectionFactory.createConnection(this.userName, this.password);
            } else {
                this.connection = (ActiveMQConnection) this.connectionFactory.createConnection();
            }
            this.connection.setClientID(broker.getContainer().getName());
            this.connection.start();
            this.inboundSession = this.connection.createSession(false, 1);
            this.inboundSession.createConsumer(this.inboundSession.createQueue(new StringBuffer().append(INBOUND_PREFIX).append(broker.getContainer().getName()).toString())).setMessageListener(this);
            this.queueProducer = this.inboundSession.createProducer(null);
            this.broadcastSession = this.connection.createSession(false, 1);
            this.broadcastTopic = this.broadcastSession.createTopic(this.broadcastDestinationName);
            this.topicProducer = this.broadcastSession.createProducer(this.broadcastTopic);
            this.topicProducer.setDeliveryMode(1);
        } catch (JMSException e) {
            this.log.error("Failed to initialize JMSFlow", e);
            throw new JBIException(e);
        }
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow, org.apache.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void start() throws JBIException {
        if (this.started.compareAndSet(false, true)) {
            this.log.debug(new StringBuffer().append(this.broker.getContainer().getName()).append(": Starting jms flow").toString());
            super.start();
            try {
                this.broadcastConsumer = this.broadcastSession.createConsumer(this.broadcastTopic, null, true);
                this.broadcastConsumer.setMessageListener(new MessageListener(this) { // from class: org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow.3
                    private final JMSFlow this$0;

                    {
                        this.this$0 = this;
                    }

                    @Override // javax.jms.MessageListener
                    public void onMessage(Message message) {
                        try {
                            Serializable object = ((ObjectMessage) message).getObject();
                            if (object instanceof EndpointEvent) {
                                EndpointEvent endpointEvent = (EndpointEvent) object;
                                if (!this.this$0.getBroker().getContainer().getName().equals(((InternalEndpoint) endpointEvent.getEndpoint()).getComponentNameSpace().getContainerName())) {
                                    if (endpointEvent.getEventType() == 0) {
                                        this.this$0.onRemoteEndpointRegistered(endpointEvent);
                                    } else if (endpointEvent.getEventType() == 1) {
                                        this.this$0.onRemoteEndpointUnregistered(endpointEvent);
                                    }
                                }
                            }
                        } catch (Exception e) {
                            this.this$0.log.error("Error processing incoming broadcast message", e);
                        }
                    }
                });
                this.advisoryConsumer = this.broadcastSession.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) this.broadcastTopic));
                this.advisoryConsumer.setMessageListener(new MessageListener(this) { // from class: org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow.4
                    private final JMSFlow this$0;

                    {
                        this.this$0 = this;
                    }

                    @Override // javax.jms.MessageListener
                    public void onMessage(Message message) {
                        if (this.this$0.started.get()) {
                            this.this$0.onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
                        }
                    }
                });
                for (ComponentMBeanImpl componentMBeanImpl : this.broker.getContainer().getRegistry().getComponents()) {
                    if (componentMBeanImpl.isStarted()) {
                        onComponentStarted(new ComponentEvent(componentMBeanImpl, 2));
                    }
                }
                ServiceEndpoint[] endpointsForInterface = this.broker.getContainer().getRegistry().getEndpointsForInterface(null);
                for (int i = 0; i < endpointsForInterface.length; i++) {
                    if ((endpointsForInterface[i] instanceof InternalEndpoint) && ((InternalEndpoint) endpointsForInterface[i]).isLocal()) {
                        onInternalEndpointRegistered(new EndpointEvent(endpointsForInterface[i], 0), false);
                    }
                }
            } catch (JMSException e) {
                throw new JBIException(new StringBuffer().append("JMSException caught in start: ").append(e.getMessage()).toString());
            }
        }
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow, org.apache.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void stop() throws JBIException {
        if (this.started.compareAndSet(true, false)) {
            this.log.debug(new StringBuffer().append(this.broker.getContainer().getName()).append(": Stopping jms flow").toString());
            super.stop();
            Iterator it = this.subscriberSet.iterator();
            while (it.hasNext()) {
                removeAllPackets((String) it.next());
            }
            this.subscriberSet.clear();
            try {
                this.advisoryConsumer.close();
                this.broadcastConsumer.close();
            } catch (JMSException e) {
                this.log.debug("JMSException caught in stop", e);
            }
        }
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow, org.apache.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void shutDown() throws JBIException {
        super.shutDown();
        stop();
        this.broker.getContainer().removeListener(this.endpointListener);
        this.broker.getContainer().removeListener(this.componentListener);
        this.executor.shutdown();
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (JMSException e) {
                this.log.warn("Error closing JMS Connection", e);
            }
        }
    }

    public int numberInNetwork() {
        return this.subscriberSet.size();
    }

    public void onInternalEndpointRegistered(EndpointEvent endpointEvent, boolean z) {
        if (this.started.get()) {
            try {
                String key = EndpointSupport.getKey(endpointEvent.getEndpoint());
                if (!this.consumerMap.containsKey(key)) {
                    MessageConsumer createConsumer = this.inboundSession.createConsumer(this.inboundSession.createQueue(new StringBuffer().append(INBOUND_PREFIX).append(key).toString()));
                    createConsumer.setMessageListener(this);
                    this.consumerMap.put(key, createConsumer);
                }
                if (z) {
                    this.log.debug(new StringBuffer().append(this.broker.getContainer().getName()).append(": broadcasting info for ").append(endpointEvent).toString());
                    ObjectMessage createObjectMessage = this.broadcastSession.createObjectMessage(endpointEvent);
                    synchronized (this.topicProducer) {
                        this.topicProducer.send(createObjectMessage);
                    }
                }
            } catch (Exception e) {
                this.log.error(new StringBuffer().append("Cannot create consumer for ").append(endpointEvent.getEndpoint()).toString(), e);
            }
        }
    }

    public void onInternalEndpointUnregistered(EndpointEvent endpointEvent, boolean z) {
        try {
            MessageConsumer messageConsumer = (MessageConsumer) this.consumerMap.remove(EndpointSupport.getKey(endpointEvent.getEndpoint()));
            if (messageConsumer != null) {
                messageConsumer.close();
            }
            if (z) {
                ObjectMessage createObjectMessage = this.broadcastSession.createObjectMessage(endpointEvent);
                this.log.debug(new StringBuffer().append(this.broker.getContainer().getName()).append(": broadcasting info for ").append(endpointEvent).toString());
                synchronized (this.topicProducer) {
                    this.topicProducer.send(createObjectMessage);
                }
            }
        } catch (Exception e) {
            this.log.error(new StringBuffer().append("Cannot destroy consumer for ").append(endpointEvent).toString(), e);
        }
    }

    public void onComponentStarted(ComponentEvent componentEvent) {
        if (this.started.get()) {
            try {
                String name = componentEvent.getComponent().getName();
                if (!this.consumerMap.containsKey(name)) {
                    MessageConsumer createConsumer = this.inboundSession.createConsumer(this.inboundSession.createQueue(new StringBuffer().append(INBOUND_PREFIX).append(name).toString()));
                    createConsumer.setMessageListener(this);
                    this.consumerMap.put(name, createConsumer);
                }
            } catch (Exception e) {
                this.log.error(new StringBuffer().append("Cannot create consumer for component ").append(componentEvent.getComponent().getName()).toString(), e);
            }
        }
    }

    public void onComponentStopped(ComponentEvent componentEvent) {
        try {
            MessageConsumer messageConsumer = (MessageConsumer) this.consumerMap.remove(componentEvent.getComponent().getName());
            if (messageConsumer != null) {
                messageConsumer.close();
            }
        } catch (Exception e) {
            this.log.error(new StringBuffer().append("Cannot destroy consumer for component ").append(componentEvent.getComponent().getName()).toString(), e);
        }
    }

    public void onRemoteEndpointRegistered(EndpointEvent endpointEvent) {
        this.log.debug(new StringBuffer().append(this.broker.getContainer().getName()).append(": adding remote endpoint: ").append(endpointEvent.getEndpoint()).toString());
        this.broker.getContainer().getRegistry().registerRemoteEndpoint(endpointEvent.getEndpoint());
    }

    public void onRemoteEndpointUnregistered(EndpointEvent endpointEvent) {
        this.log.debug(new StringBuffer().append(this.broker.getContainer().getName()).append(": removing remote endpoint: ").append(endpointEvent.getEndpoint()).toString());
        this.broker.getContainer().getRegistry().unregisterRemoteEndpoint(endpointEvent.getEndpoint());
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow
    protected void doSend(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        doRouting(messageExchangeImpl);
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow
    public void doRouting(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        String stringBuffer;
        try {
            if (messageExchangeImpl.getRole() == MessageExchange.Role.PROVIDER) {
                stringBuffer = messageExchangeImpl.getDestinationId() == null ? new StringBuffer().append(INBOUND_PREFIX).append(EndpointSupport.getKey(messageExchangeImpl.getEndpoint())).toString() : (!Boolean.TRUE.equals(messageExchangeImpl.getProperty(JbiConstants.STATELESS_PROVIDER)) || isSynchronous(messageExchangeImpl)) ? new StringBuffer().append(INBOUND_PREFIX).append(messageExchangeImpl.getDestinationId().getContainerName()).toString() : new StringBuffer().append(INBOUND_PREFIX).append(messageExchangeImpl.getDestinationId().getName()).toString();
            } else {
                if (messageExchangeImpl.getSourceId() == null) {
                    throw new IllegalStateException("No sourceId set on the exchange");
                }
                stringBuffer = (!Boolean.TRUE.equals(messageExchangeImpl.getProperty(JbiConstants.STATELESS_CONSUMER)) || isSynchronous(messageExchangeImpl)) ? new StringBuffer().append(INBOUND_PREFIX).append(messageExchangeImpl.getSourceId().getContainerName()).toString() : messageExchangeImpl.getProperty(JbiConstants.SENDER_ENDPOINT) != null ? new StringBuffer().append(INBOUND_PREFIX).append(messageExchangeImpl.getProperty(JbiConstants.SENDER_ENDPOINT)).toString() : new StringBuffer().append(INBOUND_PREFIX).append(messageExchangeImpl.getSourceId().getName()).toString();
            }
            Queue createQueue = this.inboundSession.createQueue(stringBuffer);
            ObjectMessage createObjectMessage = this.inboundSession.createObjectMessage(messageExchangeImpl);
            synchronized (this.queueProducer) {
                this.queueProducer.send(createQueue, createObjectMessage);
            }
        } catch (JMSException e) {
            this.log.error(new StringBuffer().append("Failed to send exchange: ").append(messageExchangeImpl).append(" internal JMS Network").toString(), e);
            throw new MessagingException(e);
        }
    }

    @Override // javax.jms.MessageListener
    public void onMessage(Message message) {
        if (message != null) {
            try {
                if (this.started.get()) {
                    this.executor.execute(new Runnable(this, (MessageExchangeImpl) ((ObjectMessage) message).getObject()) { // from class: org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow.5
                        private final MessageExchangeImpl val$me;
                        private final JMSFlow this$0;

                        {
                            this.this$0 = this;
                            this.val$me = r5;
                        }

                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                if (this.val$me.getDestinationId() == null) {
                                    ServiceEndpoint endpoint = this.val$me.getEndpoint();
                                    ServiceEndpoint internalEndpoint = this.this$0.broker.getContainer().getRegistry().getInternalEndpoint(endpoint.getServiceName(), endpoint.getEndpointName());
                                    this.val$me.setEndpoint(internalEndpoint);
                                    this.val$me.setDestinationId(((InternalEndpoint) internalEndpoint).getComponentNameSpace());
                                }
                                JMSFlow.super.doRouting(this.val$me);
                            } catch (Throwable th) {
                                this.this$0.log.error("Caught an exception routing ExchangePacket: ", th);
                            }
                        }
                    });
                }
            } catch (JMSException e) {
                this.log.error("Caught an exception unpacking JMS Message: ", e);
            }
        }
    }

    protected void onAdvisoryMessage(Object obj) {
        if (!(obj instanceof ConsumerInfo)) {
            if (obj instanceof RemoveInfo) {
                ConsumerId consumerId = (ConsumerId) ((RemoveInfo) obj).getObjectId();
                this.subscriberSet.remove(consumerId.getConnectionId());
                removeAllPackets(consumerId.getConnectionId());
                return;
            }
            return;
        }
        this.subscriberSet.add(((ConsumerInfo) obj).getConsumerId().getConnectionId());
        ServiceEndpoint[] endpointsForInterface = this.broker.getContainer().getRegistry().getEndpointsForInterface(null);
        for (int i = 0; i < endpointsForInterface.length; i++) {
            if ((endpointsForInterface[i] instanceof InternalEndpoint) && ((InternalEndpoint) endpointsForInterface[i]).isLocal()) {
                onInternalEndpointRegistered(new EndpointEvent(endpointsForInterface[i], 0), true);
            }
        }
    }

    private void removeAllPackets(String str) {
    }
}
