001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.management;
018
019 import java.lang.reflect.Method;
020 import java.util.Collection;
021 import java.util.HashMap;
022 import java.util.Map;
023 import javax.management.JMException;
024 import javax.management.MalformedObjectNameException;
025 import javax.management.ObjectName;
026
027 import org.apache.camel.CamelContext;
028 import org.apache.camel.Consumer;
029 import org.apache.camel.Endpoint;
030 import org.apache.camel.Processor;
031 import org.apache.camel.Route;
032 import org.apache.camel.Service;
033 import org.apache.camel.impl.DefaultCamelContext;
034 import org.apache.camel.impl.ServiceSupport;
035 import org.apache.camel.model.ProcessorDefinition;
036 import org.apache.camel.model.RouteDefinition;
037 import org.apache.camel.spi.ClassResolver;
038 import org.apache.camel.spi.InstrumentationAgent;
039 import org.apache.camel.spi.InterceptStrategy;
040 import org.apache.camel.spi.LifecycleStrategy;
041 import org.apache.camel.spi.RouteContext;
042 import org.apache.camel.util.ObjectHelper;
043 import org.apache.commons.logging.Log;
044 import org.apache.commons.logging.LogFactory;
045
046 /**
047 * JMX agent that registeres Camel lifecycle events in JMX.
048 *
049 * @version $Revision: 784038 $
050 */
051 public class InstrumentationLifecycleStrategy implements LifecycleStrategy {
052 private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
053
054 private static final String MANAGED_RESOURCE_CLASSNAME = "org.springframework.jmx.export.annotation.ManagedResource";
055 private InstrumentationAgent agent;
056 private CamelNamingStrategy namingStrategy;
057 private boolean initialized;
058 private final Map<Endpoint, InstrumentationProcessor> registeredRoutes = new HashMap<Endpoint, InstrumentationProcessor>();
059
060 public InstrumentationLifecycleStrategy() {
061 this(new DefaultInstrumentationAgent());
062 }
063
064 public InstrumentationLifecycleStrategy(InstrumentationAgent agent) {
065 this.agent = agent;
066 }
067 /**
068 * Constructor for camel context that has been started.
069 *
070 * @param agent the agent
071 * @param context the camel context
072 */
073 public InstrumentationLifecycleStrategy(InstrumentationAgent agent, CamelContext context) {
074 this.agent = agent;
075 onContextStart(context);
076 }
077
078 public void onContextStart(CamelContext context) {
079 // register camel context
080 if (context instanceof DefaultCamelContext) {
081 try {
082 initialized = true;
083 DefaultCamelContext dc = (DefaultCamelContext)context;
084 // call addService so that context will start and stop the agent
085 dc.addService(agent);
086 namingStrategy = new CamelNamingStrategy(agent.getMBeanObjectDomainName());
087 ManagedService ms = new ManagedService(dc);
088 agent.register(ms, getNamingStrategy().getObjectName(dc));
089 } catch (Exception e) {
090 // must rethrow to allow CamelContext fallback to non JMX agent to allow
091 // Camel to continue to run
092 throw ObjectHelper.wrapRuntimeCamelException(e);
093 }
094 }
095 }
096
097 /**
098 * If the endpoint is an instance of ManagedResource then register it with the
099 * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
100 * register that with the mbean server.
101 * @param endpoint the Endpoint attempted to be added
102 */
103 @SuppressWarnings("unchecked")
104 public void onEndpointAdd(Endpoint endpoint) {
105 // the agent hasn't been started
106 if (!initialized) {
107 return;
108 }
109
110 // see if the spring-jmx is on the classpath
111 Class annotationClass = resolveManagedAnnotation(endpoint);
112 if (annotationClass == null) {
113 // no its not so register the endpoint as a new managed endpoint
114 registerEndpointAsManagedEndpoint(endpoint);
115 return;
116 }
117
118 // see if the endpoint have been annotation with a spring JMX annotation
119 Object annotation = endpoint.getClass().getAnnotation(annotationClass);
120 if (annotation == null) {
121 // no its not so register the endpoint as a new managed endpoint
122 registerEndpointAsManagedEndpoint(endpoint);
123 } else {
124 // there is already a spring JMX annotation so attempt to register it
125 attemptToRegisterManagedResource(endpoint, annotation);
126 }
127 }
128
129 private Class resolveManagedAnnotation(Endpoint endpoint) {
130 CamelContext context = endpoint.getCamelContext();
131
132 ClassResolver resolver = context.getClassResolver();
133 return resolver.resolveClass(MANAGED_RESOURCE_CLASSNAME);
134 }
135
136 private void attemptToRegisterManagedResource(Endpoint endpoint, Object annotation) {
137 try {
138 Method m = annotation.getClass().getMethod("objectName");
139
140 String objectNameStr = (String) m.invoke(annotation);
141
142 ObjectName objectName = new ObjectName(objectNameStr);
143 agent.register(endpoint, objectName);
144 } catch (Exception e) {
145 LOG.debug("objectName method not present, wrapping endpoint in ManagedEndpoint instead");
146 registerEndpointAsManagedEndpoint(endpoint);
147 }
148 }
149
150 private void registerEndpointAsManagedEndpoint(Endpoint endpoint) {
151 try {
152 ManagedEndpoint me = new ManagedEndpoint(endpoint);
153 agent.register(me, getNamingStrategy().getObjectName(me));
154 } catch (JMException e) {
155 LOG.warn("Could not register Endpoint MBean for uri: " + endpoint.getEndpointUri(), e);
156 }
157 }
158
159 @SuppressWarnings("unchecked")
160 public void onRoutesAdd(Collection<Route> routes) {
161 // the agent hasn't been started
162 if (!initialized) {
163 return;
164 }
165
166 for (Route route : routes) {
167 try {
168 ManagedRoute mr = new ManagedRoute(route);
169 // retrieve the per-route intercept for this route
170 InstrumentationProcessor processor = registeredRoutes.get(route.getEndpoint());
171 if (processor == null) {
172 LOG.warn("Route has not been instrumented for endpoint: " + route.getEndpoint());
173 } else {
174 // let the instrumentation use our route counter
175 processor.setCounter(mr);
176 }
177 agent.register(mr, getNamingStrategy().getObjectName(mr));
178 } catch (JMException e) {
179 LOG.warn("Could not register Route MBean", e);
180 }
181 }
182 }
183
184 public void onServiceAdd(CamelContext context, Service service) {
185 // the agent hasn't been started
186 if (!initialized) {
187 return;
188 }
189
190 // register consumer
191 if (service instanceof ServiceSupport && service instanceof Consumer) {
192 // TODO: add support for non-consumer services?
193 try {
194 ManagedService ms = new ManagedService((ServiceSupport)service);
195 agent.register(ms, getNamingStrategy().getObjectName(context, ms));
196 } catch (JMException e) {
197 LOG.warn("Could not register Service MBean", e);
198 }
199 }
200 }
201
202 public void onRouteContextCreate(RouteContext routeContext) {
203 // the agent hasn't been started
204 if (!initialized) {
205 return;
206 }
207
208 // Create a map (ProcessorType -> PerformanceCounter)
209 // to be passed to InstrumentationInterceptStrategy.
210 Map<ProcessorDefinition, PerformanceCounter> registeredCounters =
211 new HashMap<ProcessorDefinition, PerformanceCounter>();
212
213 // Each processor in a route will have its own performance counter
214 // The performance counter are MBeans that we register with MBeanServer.
215 // These performance counter will be embedded
216 // to InstrumentationProcessor and wrap the appropriate processor
217 // by InstrumentationInterceptStrategy.
218 RouteDefinition route = routeContext.getRoute();
219
220 // TODO: This only registers counters for the first outputs in the route
221 // all the chidren of the outputs is not registered
222 // we should leverge the Channel for this to ensure we register all processors
223 // in the entire route graph
224
225 // register all processors
226 for (ProcessorDefinition processor : route.getOutputs()) {
227 // skip processors that should not be registered
228 if (!registerProcessor(processor)) {
229 continue;
230 }
231
232 ObjectName name = null;
233 try {
234 // get the mbean name
235 name = getNamingStrategy().getObjectName(routeContext, processor);
236
237 // register mbean wrapped in the performance counter mbean
238 PerformanceCounter pc = new PerformanceCounter();
239 agent.register(pc, name);
240
241 // add to map now that it has been registered
242 registeredCounters.put(processor, pc);
243 } catch (MalformedObjectNameException e) {
244 LOG.warn("Could not create MBean name: " + name, e);
245 } catch (JMException e) {
246 LOG.warn("Could not register PerformanceCounter MBean: " + name, e);
247 }
248 }
249
250 // add intercept strategy that executes the JMX instrumentation for performance metrics
251 routeContext.addInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters));
252
253 // instrument the route endpoint
254 final Endpoint endpoint = routeContext.getEndpoint();
255
256 // only needed to register on the first output as all rotues will pass through this one
257 ProcessorDefinition out = routeContext.getRoute().getOutputs().get(0);
258
259 // add an intercept strategy that counts when the route sends to any of its outputs
260 out.addInterceptStrategy(new InterceptStrategy() {
261 public Processor wrapProcessorInInterceptors(ProcessorDefinition processorDefinition, Processor target, Processor nextTarget) throws Exception {
262 if (registeredRoutes.containsKey(endpoint)) {
263 // do not double wrap
264 return target;
265 }
266 InstrumentationProcessor wrapper = new InstrumentationProcessor(null);
267 wrapper.setType(processorDefinition.getShortName());
268 wrapper.setProcessor(target);
269
270 // register our wrapper
271 registeredRoutes.put(endpoint, wrapper);
272
273 return wrapper;
274 }
275
276 public String toString() {
277 return "Instrument";
278 }
279 });
280
281 }
282
283 /**
284 * Should the given processor be registered.
285 */
286 protected boolean registerProcessor(ProcessorDefinition processor) {
287 if (agent instanceof DefaultInstrumentationAgent) {
288 DefaultInstrumentationAgent dia = (DefaultInstrumentationAgent) agent;
289 if (dia.getOnlyRegisterProcessorWithCustomId() != null && dia.getOnlyRegisterProcessorWithCustomId()) {
290 // only register if the processor have an explicy id assigned
291 return processor.hasCustomIdAssigned();
292 }
293 }
294
295 // fallback to always register it
296 return true;
297 }
298
299 public CamelNamingStrategy getNamingStrategy() {
300 return namingStrategy;
301 }
302
303 public void setNamingStrategy(CamelNamingStrategy strategy) {
304 this.namingStrategy = strategy;
305 }
306
307 public void setAgent(InstrumentationAgent agent) {
308 this.agent = agent;
309 }
310
311 }