package org.apache.servicemix.jbi.nmr.flow.jms;

import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jbi.JBIException;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.executors.Executor;
import org.apache.servicemix.jbi.event.ComponentAdapter;
import org.apache.servicemix.jbi.event.ComponentEvent;
import org.apache.servicemix.jbi.event.ComponentListener;
import org.apache.servicemix.jbi.event.EndpointAdapter;
import org.apache.servicemix.jbi.event.EndpointEvent;
import org.apache.servicemix.jbi.event.EndpointListener;
import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.nmr.Broker;
import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;

/* loaded from: input_file:WEB-INF/lib/servicemix-core-3.3.1.jar:org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.class */
public abstract class AbstractJMSFlow extends AbstractFlow implements MessageListener {
    private static final String INBOUND_PREFIX = "org.apache.servicemix.jms.";
    protected ConnectionFactory connectionFactory;
    protected Connection connection;
    protected MessageConsumer monitorMessageConsumer;
    private String userName;
    private String password;
    private MessageConsumer broadcastConsumer;
    private EndpointListener endpointListener;
    private ComponentListener componentListener;
    private Executor executor;
    protected AtomicBoolean started = new AtomicBoolean(false);
    protected Set<String> subscriberSet = new CopyOnWriteArraySet();
    private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
    private Map<String, MessageConsumerSession> consumerMap = new ConcurrentHashMap();
    private String jmsURL = "peer://org.apache.servicemix?persistent=false";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/servicemix-core-3.3.1.jar:org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow$MessageConsumerSession.class */
    public final class MessageConsumerSession {
        private Session session;
        private MessageConsumer consumer;

        private MessageConsumerSession(String str, MessageListener messageListener) throws JMSException {
            this.session = AbstractJMSFlow.this.connection.createSession(false, 1);
            this.consumer = this.session.createConsumer(this.session.createQueue(AbstractJMSFlow.INBOUND_PREFIX + str));
            this.consumer.setMessageListener(messageListener);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() throws JMSException {
            if (this.consumer != null) {
                this.consumer.close();
            }
            if (this.session != null) {
                this.session.close();
            }
        }
    }

    @Override // org.apache.servicemix.jbi.management.MBeanInfoProvider
    public String getDescription() {
        return "jms";
    }

    public String getPassword() {
        return this.password;
    }

    public void setPassword(String str) {
        this.password = str;
    }

    public String getUserName() {
        return this.userName;
    }

    public void setUserName(String str) {
        this.userName = str;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public String getBroadcastDestinationName() {
        return this.broadcastDestinationName;
    }

    public void setBroadcastDestinationName(String str) {
        this.broadcastDestinationName = str;
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.Flow
    public boolean canHandle(MessageExchange messageExchange) {
        return !isTransacted(messageExchange);
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow, org.apache.servicemix.jbi.nmr.flow.Flow
    public void init(Broker broker) throws JBIException {
        this.log.debug(broker.getContainer().getName() + ": Initializing jms flow");
        super.init(broker);
        this.executor = broker.getContainer().getExecutorFactory().createExecutor("flow.jms");
        this.endpointListener = new EndpointAdapter() { // from class: org.apache.servicemix.jbi.nmr.flow.jms.AbstractJMSFlow.1
            @Override // org.apache.servicemix.jbi.event.EndpointAdapter, org.apache.servicemix.jbi.event.EndpointListener
            public void internalEndpointRegistered(EndpointEvent endpointEvent) {
                AbstractJMSFlow.this.onInternalEndpointRegistered(endpointEvent, true);
            }

            @Override // org.apache.servicemix.jbi.event.EndpointAdapter, org.apache.servicemix.jbi.event.EndpointListener
            public void internalEndpointUnregistered(EndpointEvent endpointEvent) {
                AbstractJMSFlow.this.onInternalEndpointUnregistered(endpointEvent, true);
            }
        };
        broker.getContainer().addListener(this.endpointListener);
        this.componentListener = new ComponentAdapter() { // from class: org.apache.servicemix.jbi.nmr.flow.jms.AbstractJMSFlow.2
            @Override // org.apache.servicemix.jbi.event.ComponentAdapter, org.apache.servicemix.jbi.event.ComponentListener
            public void componentStarted(ComponentEvent componentEvent) {
                AbstractJMSFlow.this.onComponentStarted(componentEvent);
            }

            @Override // org.apache.servicemix.jbi.event.ComponentAdapter, org.apache.servicemix.jbi.event.ComponentListener
            public void componentStopped(ComponentEvent componentEvent) {
                AbstractJMSFlow.this.onComponentStopped(componentEvent);
            }
        };
        broker.getContainer().addListener(this.componentListener);
        try {
            if (this.connectionFactory == null) {
                this.connectionFactory = createConnectionFactoryFromUrl(this.jmsURL);
            }
            if (this.userName != null) {
                this.connection = this.connectionFactory.createConnection(this.userName, this.password);
            } else {
                this.connection = this.connectionFactory.createConnection();
            }
            this.connection.setClientID(broker.getContainer().getName());
            this.connection.start();
            Session createSession = this.connection.createSession(false, 1);
            createSession.createConsumer(createSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName())).setMessageListener(this);
        } catch (JMSException e) {
            this.log.error("Failed to initialize JMSFlow", e);
            throw new JBIException(e);
        }
    }

    protected abstract ConnectionFactory createConnectionFactoryFromUrl(String str);

    protected abstract void onConsumerMonitorMessage(Message message);

    public abstract void startConsumerMonitor() throws JMSException;

    public void stopConsumerMonitor() throws JMSException {
        this.monitorMessageConsumer.close();
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow, org.apache.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void start() throws JBIException {
        if (this.started.compareAndSet(false, true)) {
            this.log.debug(this.broker.getContainer().getName() + ": Starting jms flow");
            super.start();
            try {
                Session createSession = this.connection.createSession(false, 1);
                this.broadcastConsumer = createSession.createConsumer(createSession.createTopic(this.broadcastDestinationName), null, true);
                this.broadcastConsumer.setMessageListener(new MessageListener() { // from class: org.apache.servicemix.jbi.nmr.flow.jms.AbstractJMSFlow.3
                    @Override // javax.jms.MessageListener
                    public void onMessage(Message message) {
                        try {
                            Serializable object = ((ObjectMessage) message).getObject();
                            if (object instanceof EndpointEvent) {
                                EndpointEvent endpointEvent = (EndpointEvent) object;
                                if (!AbstractJMSFlow.this.getBroker().getContainer().getName().equals(((InternalEndpoint) endpointEvent.getEndpoint()).getComponentNameSpace().getContainerName())) {
                                    if (endpointEvent.getEventType() == 0) {
                                        AbstractJMSFlow.this.onRemoteEndpointRegistered(endpointEvent);
                                    } else if (endpointEvent.getEventType() == 1) {
                                        AbstractJMSFlow.this.onRemoteEndpointUnregistered(endpointEvent);
                                    }
                                }
                            }
                        } catch (Exception e) {
                            AbstractJMSFlow.this.log.error("Error processing incoming broadcast message", e);
                        }
                    }
                });
                for (ComponentMBeanImpl componentMBeanImpl : this.broker.getContainer().getRegistry().getComponents()) {
                    if (componentMBeanImpl.isStarted()) {
                        onComponentStarted(new ComponentEvent(componentMBeanImpl, 2));
                    }
                }
                ServiceEndpoint[] endpointsForInterface = this.broker.getContainer().getRegistry().getEndpointsForInterface(null);
                for (int i = 0; i < endpointsForInterface.length; i++) {
                    if ((endpointsForInterface[i] instanceof InternalEndpoint) && ((InternalEndpoint) endpointsForInterface[i]).isLocal()) {
                        onInternalEndpointRegistered(new EndpointEvent(endpointsForInterface[i], 0), false);
                    }
                }
                startConsumerMonitor();
            } catch (JMSException e) {
                throw new JBIException("JMSException caught in start: " + e.getMessage());
            }
        }
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow, org.apache.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void stop() throws JBIException {
        if (this.started.compareAndSet(true, false)) {
            this.log.debug(this.broker.getContainer().getName() + ": Stopping jms flow");
            super.stop();
            Iterator<String> it = this.subscriberSet.iterator();
            while (it.hasNext()) {
                removeAllPackets(it.next());
            }
            this.subscriberSet.clear();
            try {
                stopConsumerMonitor();
                this.broadcastConsumer.close();
            } catch (JMSException e) {
                this.log.debug("JMSException caught in stop", e);
            }
        }
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow, org.apache.servicemix.jbi.management.BaseLifeCycle, javax.jbi.management.LifeCycleMBean, javax.jbi.component.ComponentLifeCycle
    public void shutDown() throws JBIException {
        super.shutDown();
        stop();
        this.broker.getContainer().removeListener(this.endpointListener);
        this.broker.getContainer().removeListener(this.componentListener);
        if (this.connection != null) {
            try {
                this.connection.close();
            } catch (JMSException e) {
                this.log.warn("Error closing JMS Connection", e);
            }
        }
    }

    public int numberInNetwork() {
        return this.subscriberSet.size();
    }

    public void onInternalEndpointRegistered(EndpointEvent endpointEvent, boolean z) {
        if (this.started.get()) {
            try {
                String key = EndpointSupport.getKey(endpointEvent.getEndpoint());
                if (!this.consumerMap.containsKey(key)) {
                    this.consumerMap.put(key, new MessageConsumerSession(key, this));
                }
                if (z) {
                    broadcast(endpointEvent);
                }
            } catch (Exception e) {
                this.log.error("Cannot create consumer for " + endpointEvent.getEndpoint(), e);
            }
        }
    }

    public void onInternalEndpointUnregistered(EndpointEvent endpointEvent, boolean z) {
        try {
            MessageConsumerSession remove = this.consumerMap.remove(EndpointSupport.getKey(endpointEvent.getEndpoint()));
            if (remove != null) {
                remove.close();
            }
            if (z) {
                broadcast(endpointEvent);
            }
        } catch (Exception e) {
            this.log.error("Cannot destroy consumer for " + endpointEvent, e);
        }
    }

    protected void broadcast(EndpointEvent endpointEvent) throws Exception {
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.broker.getContainer().getName() + ": broadcasting info for " + endpointEvent);
        }
        Session createSession = this.connection.createSession(false, 1);
        try {
            ObjectMessage createObjectMessage = createSession.createObjectMessage(endpointEvent);
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic(this.broadcastDestinationName));
            createProducer.setDeliveryMode(1);
            createProducer.send(createObjectMessage);
            createSession.close();
        } catch (Throwable th) {
            createSession.close();
            throw th;
        }
    }

    public void onComponentStarted(ComponentEvent componentEvent) {
        if (this.started.get()) {
            try {
                String name = componentEvent.getComponent().getName();
                if (!this.consumerMap.containsKey(name)) {
                    this.consumerMap.put(name, new MessageConsumerSession(name, this));
                }
            } catch (Exception e) {
                this.log.error("Cannot create consumer for component " + componentEvent.getComponent().getName(), e);
            }
        }
    }

    public void onComponentStopped(ComponentEvent componentEvent) {
        try {
            MessageConsumerSession remove = this.consumerMap.remove(componentEvent.getComponent().getName());
            if (remove != null) {
                remove.close();
            }
        } catch (Exception e) {
            this.log.error("Cannot destroy consumer for component " + componentEvent.getComponent().getName(), e);
        }
    }

    public void onRemoteEndpointRegistered(EndpointEvent endpointEvent) {
        this.log.debug(this.broker.getContainer().getName() + ": adding remote endpoint: " + endpointEvent.getEndpoint());
        this.broker.getContainer().getRegistry().registerRemoteEndpoint(endpointEvent.getEndpoint());
    }

    public void onRemoteEndpointUnregistered(EndpointEvent endpointEvent) {
        this.log.debug(this.broker.getContainer().getName() + ": removing remote endpoint: " + endpointEvent.getEndpoint());
        this.broker.getContainer().getRegistry().unregisterRemoteEndpoint(endpointEvent.getEndpoint());
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow
    protected void doSend(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        doRouting(messageExchangeImpl);
    }

    @Override // org.apache.servicemix.jbi.nmr.flow.AbstractFlow
    public void doRouting(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        String str;
        try {
            if (messageExchangeImpl.getRole() == MessageExchange.Role.PROVIDER) {
                str = messageExchangeImpl.getDestinationId() == null ? INBOUND_PREFIX + EndpointSupport.getKey(messageExchangeImpl.getEndpoint()) : (!Boolean.TRUE.equals(messageExchangeImpl.getProperty(JbiConstants.STATELESS_PROVIDER)) || isSynchronous(messageExchangeImpl)) ? INBOUND_PREFIX + messageExchangeImpl.getDestinationId().getContainerName() : INBOUND_PREFIX + messageExchangeImpl.getDestinationId().getName();
            } else {
                if (messageExchangeImpl.getSourceId() == null) {
                    throw new IllegalStateException("No sourceId set on the exchange");
                }
                str = (!Boolean.TRUE.equals(messageExchangeImpl.getProperty(JbiConstants.STATELESS_CONSUMER)) || isSynchronous(messageExchangeImpl)) ? INBOUND_PREFIX + messageExchangeImpl.getSourceId().getContainerName() : messageExchangeImpl.getProperty(JbiConstants.SENDER_ENDPOINT) != null ? INBOUND_PREFIX + messageExchangeImpl.getProperty(JbiConstants.SENDER_ENDPOINT) : INBOUND_PREFIX + messageExchangeImpl.getSourceId().getName();
            }
            Session createSession = this.connection.createSession(false, 1);
            try {
                Queue createQueue = createSession.createQueue(str);
                ObjectMessage createObjectMessage = createSession.createObjectMessage(messageExchangeImpl);
                Integer num = (Integer) messageExchangeImpl.getProperty(JbiConstants.MESSAGE_PRIORITY);
                if (null != num) {
                    createObjectMessage.setJMSPriority(num.intValue());
                }
                createSession.createProducer(createQueue).send(createObjectMessage);
                createSession.close();
            } catch (Throwable th) {
                createSession.close();
                throw th;
            }
        } catch (JMSException e) {
            this.log.error("Failed to send exchange: " + messageExchangeImpl + " internal JMS Network", e);
            throw new MessagingException(e);
        }
    }

    @Override // javax.jms.MessageListener
    public void onMessage(Message message) {
        if (message != null) {
            try {
                if (this.started.get()) {
                    final MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) ((ObjectMessage) message).getObject();
                    this.executor.execute(new Runnable() { // from class: org.apache.servicemix.jbi.nmr.flow.jms.AbstractJMSFlow.4
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                if (messageExchangeImpl.getDestinationId() == null) {
                                    ServiceEndpoint endpoint = messageExchangeImpl.getEndpoint();
                                    ServiceEndpoint internalEndpoint = AbstractJMSFlow.this.broker.getContainer().getRegistry().getInternalEndpoint(endpoint.getServiceName(), endpoint.getEndpointName());
                                    messageExchangeImpl.setEndpoint(internalEndpoint);
                                    messageExchangeImpl.setDestinationId(((InternalEndpoint) internalEndpoint).getComponentNameSpace());
                                }
                                AbstractJMSFlow.super.doRouting(messageExchangeImpl);
                            } catch (Throwable th) {
                                AbstractJMSFlow.this.log.error("Caught an exception routing ExchangePacket: ", th);
                            }
                        }
                    });
                }
            } catch (JMSException e) {
                this.log.error("Caught an exception unpacking JMS Message: ", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addClusterNode(String str) {
        this.subscriberSet.add(str);
        ServiceEndpoint[] endpointsForInterface = this.broker.getContainer().getRegistry().getEndpointsForInterface(null);
        for (int i = 0; i < endpointsForInterface.length; i++) {
            if ((endpointsForInterface[i] instanceof InternalEndpoint) && ((InternalEndpoint) endpointsForInterface[i]).isLocal()) {
                onInternalEndpointRegistered(new EndpointEvent(endpointsForInterface[i], 0), true);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removeClusterNode(String str) {
        this.subscriberSet.remove(str);
        removeAllPackets(str);
    }

    protected void removeAllPackets(String str) {
    }

    public String getJmsURL() {
        return this.jmsURL;
    }

    public void setJmsURL(String str) {
        this.jmsURL = str;
    }
}
