001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor.interceptor;
018
019 import java.util.Date;
020 import java.util.HashMap;
021 import java.util.Map;
022
023 import org.apache.camel.Endpoint;
024 import org.apache.camel.Exchange;
025 import org.apache.camel.Expression;
026 import org.apache.camel.Processor;
027 import org.apache.camel.Producer;
028 import org.apache.camel.impl.DefaultExchange;
029 import org.apache.camel.impl.DefaultRouteNode;
030 import org.apache.camel.model.InterceptDefinition;
031 import org.apache.camel.model.OnCompletionDefinition;
032 import org.apache.camel.model.OnExceptionDefinition;
033 import org.apache.camel.model.ProcessorDefinition;
034 import org.apache.camel.processor.DelegateProcessor;
035 import org.apache.camel.processor.Logger;
036 import org.apache.camel.spi.ExchangeFormatter;
037 import org.apache.camel.spi.InterceptStrategy;
038 import org.apache.camel.spi.TraceableUnitOfWork;
039 import org.apache.camel.util.IntrospectionSupport;
040 import org.apache.camel.util.ObjectHelper;
041 import org.apache.camel.util.ServiceHelper;
042 import org.apache.commons.logging.Log;
043 import org.apache.commons.logging.LogFactory;
044
045 /**
046 * An interceptor for debugging and tracing routes
047 *
048 * @version $Revision: 795369 $
049 */
050 public class TraceInterceptor extends DelegateProcessor implements ExchangeFormatter {
051 private static final transient Log LOG = LogFactory.getLog(TraceInterceptor.class);
052 private static final String JPA_TRACE_EVENT_MESSAGE = "org.apache.camel.processor.interceptor.JpaTraceEventMessage";
053 private Logger logger;
054 private Producer traceEventProducer;
055 private final ProcessorDefinition node;
056 private final Tracer tracer;
057 private TraceFormatter formatter;
058 private Class jpaTraceEventMessageClass;
059
060 public TraceInterceptor(ProcessorDefinition node, Processor target, TraceFormatter formatter, Tracer tracer) {
061 super(target);
062 this.tracer = tracer;
063 this.node = node;
064 this.formatter = formatter;
065
066 // set logger to use
067 if (tracer.getLogName() != null) {
068 logger = new Logger(LogFactory.getLog(tracer.getLogName()), this);
069 } else {
070 // use default logger
071 logger = new Logger(LogFactory.getLog(TraceInterceptor.class), this);
072 }
073
074 // set logging level if provided
075 if (tracer.getLogLevel() != null) {
076 logger.setLevel(tracer.getLogLevel());
077 }
078
079 if (tracer.getFormatter() != null) {
080 this.formatter = tracer.getFormatter();
081 }
082 }
083
084 public TraceInterceptor(ProcessorDefinition node, Processor target, Tracer tracer) {
085 this(node, target, null, tracer);
086 }
087
088 @Override
089 public String toString() {
090 return "TraceInterceptor[" + node + "]";
091 }
092
093 public void process(final Exchange exchange) throws Exception {
094 // interceptor will also trace routes supposed only for TraceEvents so we need to skip
095 // logging TraceEvents to avoid infinite looping
096 if (exchange.getProperty(Exchange.TRACE_EVENT, Boolean.class) != null) {
097 // but we must still process to allow routing of TraceEvents to eg a JPA endpoint
098 super.process(exchange);
099 return;
100 }
101
102 boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange);
103
104 // whether we should trace it or not, some nodes should be skipped as they are abstract
105 // intermedidate steps for instance related to on completion
106 boolean trace = true;
107
108 // okay this is a regular exchange being routed we might need to log and trace
109 try {
110 // before
111 if (shouldLog) {
112
113 // register route path taken if TraceableUnitOfWork unit of work
114 if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
115 TraceableUnitOfWork tuow = (TraceableUnitOfWork) exchange.getUnitOfWork();
116
117 if (node instanceof OnExceptionDefinition) {
118 // special for on exception so we can see it in the trace logs
119 trace = beforeOnException((OnExceptionDefinition) node, tuow, exchange);
120 } else if (node instanceof OnCompletionDefinition) {
121 // special for on completion so we can see it in the trace logs
122 trace = beforeOnCompletion((OnCompletionDefinition) node, tuow, exchange);
123 } else {
124 // regular so just add it
125 tuow.addTraced(new DefaultRouteNode(node, super.getProcessor()));
126 }
127 }
128 }
129
130 // log and trace the processor
131 if (trace) {
132 logExchange(exchange);
133 traceExchange(exchange);
134 }
135
136 // some nodes need extra work to trace it
137 if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
138 TraceableUnitOfWork tuow = (TraceableUnitOfWork) exchange.getUnitOfWork();
139
140 if (node instanceof InterceptDefinition) {
141 // special for intercept() as we would like to trace the processor that was intercepted
142 // as well, otherwise we only see the intercepted route, but we need the both to be logged/traced
143 afterIntercept((InterceptDefinition) node, tuow, exchange);
144 }
145 }
146
147 // process the exchange
148 super.proceed(exchange);
149
150 // after (trace out)
151 if (shouldLog && tracer.isTraceOutExchanges()) {
152 logExchange(exchange);
153 traceExchange(exchange);
154 }
155 } catch (Exception e) {
156 if (shouldLogException(exchange)) {
157 logException(exchange, e);
158 }
159 throw e;
160 }
161 }
162
163 public Object format(Exchange exchange) {
164 return formatter.format(this, this.getNode(), exchange);
165 }
166
167 // Properties
168 //-------------------------------------------------------------------------
169 public ProcessorDefinition getNode() {
170 return node;
171 }
172
173 public Logger getLogger() {
174 return logger;
175 }
176
177 public TraceFormatter getFormatter() {
178 return formatter;
179 }
180
181
182 // Implementation methods
183 //-------------------------------------------------------------------------
184 protected boolean beforeOnException(OnExceptionDefinition onException, TraceableUnitOfWork tuow, Exchange exchange) throws Exception {
185 // lets see if this is the first time for this exception
186 int index = tuow.getAndIncrement(node);
187 if (index == 0) {
188 class OnExceptionExpression implements Expression {
189 @SuppressWarnings("unchecked")
190 public Object evaluate(Exchange exchange, Class type) {
191 String label = "OnException";
192 if (exchange.getProperty(Exchange.EXCEPTION_CAUGHT) != null) {
193 label += "[" + exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class).getClass().getSimpleName() + "]";
194 }
195 return exchange.getContext().getTypeConverter().convertTo(type, label);
196 }
197
198 }
199 // yes its first time then do some special to log and trace the
200 // start of onException
201 Expression exp = new OnExceptionExpression();
202
203 // add our pesudo node
204 tuow.addTraced(new DefaultRouteNode(node, exp));
205
206 // log and trace the processor that was onException so we can see immediately
207 logExchange(exchange);
208 traceExchange(exchange);
209 }
210
211 // add the processor that is invoked for this onException
212 tuow.addTraced(new DefaultRouteNode(node, super.getProcessor()));
213 return true;
214 }
215
216
217 protected boolean beforeOnCompletion(OnCompletionDefinition onCompletion, TraceableUnitOfWork tuow, Exchange exchange) throws Exception {
218 // we should only trace when we do the actual onCompletion
219 // the problem is that onCompletion is added at the very beginning of a route to be able to
220 // add synchronization hoos on unit of work so it knows to invoke the onCompletion when the
221 // exchange is done. But in the trace log we want to defer the onCompletion being logged
222 // unitl the exchange is actually completed and is doing the onCompletion routing
223 // so if the last node is null then we have just started and thus should not trace this node
224 boolean answer = tuow.getLastNode() != null;
225
226 if (exchange.getProperty(Exchange.ON_COMPLETION) != null) {
227 // if ON_COMPLETION is not null then we are actually doing the onCompletion routing
228
229 // we should trace the onCompletion route and we want a start log of the onCompletion
230 // step so get the index and see if its 0 then we can add our speical log
231 int index = tuow.getAndIncrement(node);
232 if (index == 0) {
233 class OnCompletionExpression implements Expression {
234 @SuppressWarnings("unchecked")
235 public Object evaluate(Exchange exchange, Class type) {
236 String label = "OnCompletion[" + exchange.getProperty(Exchange.CORRELATION_ID) + "]";
237 return exchange.getContext().getTypeConverter().convertTo(type, label);
238 }
239 }
240 // yes its first time then do some special to log and trace the start of onCompletion
241 Expression exp = new OnCompletionExpression();
242 // add the onCompletion and then the processor that is invoked nest
243 tuow.addTraced(new DefaultRouteNode(node, exp));
244 tuow.addTraced(new DefaultRouteNode(node, super.getProcessor()));
245
246 // log and trace so we get the onCompletion -> processor in the log
247 logExchange(exchange);
248 traceExchange(exchange);
249 } else {
250 // we are doing the onCompletion but this is after the start so just
251 // add the processor and do no special start message
252 tuow.addTraced(new DefaultRouteNode(node, super.getProcessor()));
253 }
254
255 }
256
257 return answer;
258 }
259
260 protected boolean afterIntercept(InterceptDefinition interceptr, TraceableUnitOfWork tuow, Exchange exchange) throws Exception {
261 // get the intercepted processor from the definition
262 // we need to use the UoW to have its own index of how far we got into the list
263 // of intercepted processors the intercept definition holds as the intercept
264 // definition is a single object that is shared by concurrent thread being routed
265 // so each exchange has its own private counter
266 InterceptDefinition intercept = (InterceptDefinition) node;
267 Processor last = intercept.getInterceptedProcessor(tuow.getAndIncrement(intercept));
268 if (last != null) {
269 tuow.addTraced(new DefaultRouteNode(node, last));
270
271 // log and trace the processor that was intercepted so we can see it
272 logExchange(exchange);
273 traceExchange(exchange);
274 }
275
276 return true;
277 }
278
279 protected void logExchange(Exchange exchange) {
280 // process the exchange that formats and logs it
281 logger.process(exchange);
282 }
283
284 @SuppressWarnings("unchecked")
285 protected void traceExchange(Exchange exchange) throws Exception {
286 // should we send a trace event to an optional destination?
287 if (tracer.getDestination() != null || tracer.getDestinationUri() != null) {
288
289 // create event exchange and add event information
290 Date timestamp = new Date();
291 Exchange event = new DefaultExchange(exchange);
292 event.setProperty(Exchange.TRACE_EVENT_NODE_ID, node.getId());
293 event.setProperty(Exchange.TRACE_EVENT_TIMESTAMP, timestamp);
294 // keep a reference to the original exchange in case its needed
295 event.setProperty(Exchange.TRACE_EVENT_EXCHANGE, exchange);
296
297 // create event message to sent as in body containing event information such as
298 // from node, to node, etc.
299 TraceEventMessage msg = new DefaultTraceEventMessage(timestamp, node, exchange);
300
301 // should we use ordinary or jpa objects
302 if (tracer.isUseJpa()) {
303 LOG.trace("Using class: " + JPA_TRACE_EVENT_MESSAGE + " for tracing event messages");
304
305 // load the jpa event message class
306 loadJpaTraceEventMessageClass(exchange);
307 // create a new instance of the event message class
308 Object jpa = ObjectHelper.newInstance(jpaTraceEventMessageClass);
309
310 // copy options from event to jpa
311 Map options = new HashMap();
312 IntrospectionSupport.getProperties(msg, options, null);
313 IntrospectionSupport.setProperties(jpa, options);
314 // and set the timestamp as its not a String type
315 IntrospectionSupport.setProperty(jpa, "timestamp", msg.getTimestamp());
316
317 event.getIn().setBody(jpa);
318 } else {
319 event.getIn().setBody(msg);
320 }
321
322 // marker property to indicate its a tracing event being routed in case
323 // new Exchange instances is created during trace routing so we can check
324 // for this marker when interceptor also kickins in during routing of trace events
325 event.setProperty(Exchange.TRACE_EVENT, Boolean.TRUE);
326 try {
327 // process the trace route
328 getTraceEventProducer(exchange).process(event);
329 } catch (Exception e) {
330 // log and ignore this as the original Exchange should be allowed to continue
331 LOG.error("Error processing trace event (original Exchange will continue): " + event, e);
332 }
333 }
334 }
335
336 private synchronized void loadJpaTraceEventMessageClass(Exchange exchange) {
337 if (jpaTraceEventMessageClass == null) {
338 jpaTraceEventMessageClass = exchange.getContext().getClassResolver().resolveClass(JPA_TRACE_EVENT_MESSAGE);
339 if (jpaTraceEventMessageClass == null) {
340 throw new IllegalArgumentException("Cannot find class: " + JPA_TRACE_EVENT_MESSAGE
341 + ". Make sure camel-jpa.jar is in the classpath.");
342 }
343 }
344 }
345
346 protected void logException(Exchange exchange, Throwable throwable) {
347 if (tracer.isTraceExceptions()) {
348 if (tracer.isLogStackTrace()) {
349 logger.process(exchange, throwable);
350 } else {
351 logger.process(exchange, ", Exception: " + throwable.toString());
352 }
353 }
354 }
355
356 /**
357 * Returns true if the given exchange should be logged in the trace list
358 */
359 protected boolean shouldLogExchange(Exchange exchange) {
360 return tracer.isEnabled() && (tracer.getTraceFilter() == null || tracer.getTraceFilter().matches(exchange));
361 }
362
363 /**
364 * Returns true if the given exchange should be logged when an exception was thrown
365 */
366 protected boolean shouldLogException(Exchange exchange) {
367 return tracer.isTraceExceptions();
368 }
369
370 /**
371 * Returns whether exchanges coming out of processors should be traced
372 */
373 public boolean shouldTraceOutExchanges() {
374 return tracer.isTraceOutExchanges();
375 }
376
377 /**
378 * Returns true if the given node should be logged in the trace list
379 */
380 protected boolean shouldLogNode(ProcessorDefinition node) {
381 if (node == null) {
382 return false;
383 }
384 if (!tracer.isTraceInterceptors() && (node instanceof InterceptStrategy)) {
385 return false;
386 }
387 return true;
388 }
389
390 private synchronized Producer getTraceEventProducer(Exchange exchange) throws Exception {
391 if (traceEventProducer == null) {
392 // create producer when we have access the the camel context (we dont in doStart)
393 Endpoint endpoint = tracer.getDestination() != null ? tracer.getDestination() : exchange.getContext().getEndpoint(tracer.getDestinationUri());
394 traceEventProducer = endpoint.createProducer();
395 ServiceHelper.startService(traceEventProducer);
396 }
397 return traceEventProducer;
398 }
399
400 @Override
401 protected void doStart() throws Exception {
402 super.doStart();
403 traceEventProducer = null;
404 }
405
406 @Override
407 protected void doStop() throws Exception {
408 super.doStop();
409 if (traceEventProducer != null) {
410 ServiceHelper.stopService(traceEventProducer);
411 }
412 }
413
414 }