001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.ArrayList;
020 import java.util.Arrays;
021 import java.util.Collection;
022 import java.util.Collections;
023 import java.util.HashSet;
024 import java.util.LinkedList;
025 import java.util.List;
026 import java.util.Set;
027 import java.util.concurrent.ThreadPoolExecutor;
028
029
030 import javax.xml.bind.annotation.XmlAccessType;
031 import javax.xml.bind.annotation.XmlAccessorType;
032 import javax.xml.bind.annotation.XmlAttribute;
033 import javax.xml.bind.annotation.XmlTransient;
034
035 import org.apache.camel.CamelContext;
036 import org.apache.camel.CamelException;
037 import org.apache.camel.Endpoint;
038 import org.apache.camel.Exchange;
039 import org.apache.camel.ExchangePattern;
040 import org.apache.camel.Expression;
041 import org.apache.camel.Predicate;
042 import org.apache.camel.Processor;
043 import org.apache.camel.Route;
044 import org.apache.camel.builder.DataFormatClause;
045 import org.apache.camel.builder.DeadLetterChannelBuilder;
046 import org.apache.camel.builder.ErrorHandlerBuilder;
047 import org.apache.camel.builder.ErrorHandlerBuilderRef;
048 import org.apache.camel.builder.ExpressionClause;
049 import org.apache.camel.builder.NoErrorHandlerBuilder;
050 import org.apache.camel.builder.ProcessorBuilder;
051 import org.apache.camel.impl.DefaultCamelContext;
052 import org.apache.camel.model.dataformat.DataFormatType;
053 import org.apache.camel.model.language.ConstantExpression;
054 import org.apache.camel.model.language.ExpressionType;
055 import org.apache.camel.model.language.LanguageExpression;
056 import org.apache.camel.processor.ConvertBodyProcessor;
057 import org.apache.camel.processor.DelegateProcessor;
058 import org.apache.camel.processor.Pipeline;
059 import org.apache.camel.processor.aggregate.AggregationCollection;
060 import org.apache.camel.processor.aggregate.AggregationStrategy;
061 import org.apache.camel.processor.idempotent.MessageIdRepository;
062 import org.apache.camel.spi.DataFormat;
063 import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
064 import org.apache.camel.spi.InterceptStrategy;
065 import org.apache.camel.spi.Policy;
066 import org.apache.camel.spi.RouteContext;
067 import org.apache.commons.logging.Log;
068 import org.apache.commons.logging.LogFactory;
069
070 /**
071 * Base class for processor types that most XML types extend.
072 *
073 * @version $Revision: 767791 $
074 */
075 @XmlAccessorType(XmlAccessType.PROPERTY)
076 public abstract class ProcessorType<Type extends ProcessorType> extends OptionalIdentifiedType<Type> implements Block {
077
078 /**
079 * @deprecated will be removed in Camel 2.0
080 */
081 public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
082
083 private static final transient Log LOG = LogFactory.getLog(ProcessorType.class);
084 private ErrorHandlerBuilder errorHandlerBuilder;
085 private Boolean inheritErrorHandlerFlag;
086 private NodeFactory nodeFactory;
087 private LinkedList<Block> blocks = new LinkedList<Block>();
088 private ProcessorType<? extends ProcessorType> parent;
089 private List<InterceptorType> interceptors = new ArrayList<InterceptorType>();
090 private String errorHandlerRef;
091
092 // else to use an optional attribute in JAXB2
093 public abstract List<ProcessorType<?>> getOutputs();
094
095
096 public Processor createProcessor(RouteContext routeContext) throws Exception {
097 throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
098 }
099
100 public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
101 Collection<ProcessorType<?>> outputs = getOutputs();
102 return createOutputsProcessor(routeContext, outputs);
103 }
104
105 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
106 Processor processor = makeProcessor(routeContext);
107 if (!routeContext.isRouteAdded()) {
108 routeContext.addEventDrivenProcessor(processor);
109 }
110 }
111
112 /**
113 * Wraps the child processor in whatever necessary interceptors and error
114 * handlers
115 */
116 public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
117 processor = wrapProcessorInInterceptors(routeContext, processor);
118 return wrapInErrorHandler(routeContext, processor);
119 }
120
121 // Fluent API
122 // -------------------------------------------------------------------------
123
124 /**
125 * Sends the exchange to the given endpoint URI
126 */
127 public Type to(String uri) {
128 addOutput(new ToType(uri));
129 return (Type) this;
130 }
131
132
133 /**
134 * Sends the exchange to the given endpoint
135 */
136 public Type to(Endpoint endpoint) {
137 addOutput(new ToType(endpoint));
138 return (Type) this;
139 }
140
141 /**
142 * Sends the exchange with certain exchange pattern to the given endpoint
143 *
144 * @param pattern the pattern to use for the message exchange
145 * @param uri the endpoint to send to
146 * @return the builder
147 */
148 public Type to(ExchangePattern pattern, String uri) {
149 addOutput(new ToType(uri, pattern));
150 return (Type) this;
151 }
152
153
154 /**
155 * Sends the exchange with certain exchange pattern to the given endpoint
156 *
157 * @param pattern the pattern to use for the message exchange
158 * @param endpoint the endpoint to send to
159 * @return the builder
160 */
161 public Type to(ExchangePattern pattern, Endpoint endpoint) {
162 addOutput(new ToType(endpoint, pattern));
163 return (Type) this;
164 }
165
166 /**
167 * Sends the exchange to a list of endpoints
168 */
169 public Type to(String... uris) {
170 for (String uri : uris) {
171 addOutput(new ToType(uri));
172 }
173 return (Type) this;
174 }
175
176
177 /**
178 * Sends the exchange to a list of endpoints
179 */
180 public Type to(Endpoint... endpoints) {
181 for (Endpoint endpoint : endpoints) {
182 addOutput(new ToType(endpoint));
183 }
184 return (Type) this;
185 }
186
187 /**
188 * Sends the exchange to a list of endpoint
189 */
190 public Type to(Iterable<Endpoint> endpoints) {
191 for (Endpoint endpoint : endpoints) {
192 addOutput(new ToType(endpoint));
193 }
194 return (Type) this;
195 }
196
197
198 /**
199 * Sends the exchange to a list of endpoints
200 *
201 * @param pattern the pattern to use for the message exchanges
202 * @param uris list of endpoints to send to
203 * @return the builder
204 */
205 public Type to(ExchangePattern pattern, String... uris) {
206 for (String uri : uris) {
207 addOutput(new ToType(uri, pattern));
208 }
209 return (Type) this;
210 }
211
212 /**
213 * Sends the exchange to a list of endpoints
214 *
215 * @param pattern the pattern to use for the message exchanges
216 * @param endpoints list of endpoints to send to
217 * @return the builder
218 */
219 public Type to(ExchangePattern pattern, Endpoint... endpoints) {
220 for (Endpoint endpoint : endpoints) {
221 addOutput(new ToType(endpoint, pattern));
222 }
223 return (Type) this;
224 }
225
226 /**
227 * Sends the exchange to a list of endpoints
228 *
229 * @param pattern the pattern to use for the message exchanges
230 * @param endpoints list of endpoints to send to
231 * @return the builder
232 */
233 public Type to(ExchangePattern pattern, Iterable<Endpoint> endpoints) {
234 for (Endpoint endpoint : endpoints) {
235 addOutput(new ToType(endpoint, pattern));
236 }
237 return (Type) this;
238 }
239
240
241 /**
242 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">ExchangePattern:</a>
243 * set the ExchangePattern {@link ExchangePattern} into the exchange
244 *
245 * @param exchangePattern instance of {@link ExchangePattern}
246 * @return the builder
247 */
248 public Type setExchangePattern(ExchangePattern exchangePattern) {
249 addOutput(new SetExchangePatternType(exchangePattern));
250 return (Type) this;
251 }
252
253 /**
254 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">ExchangePattern:</a>
255 * set the exchange's ExchangePattern {@link ExchangePattern} to be InOnly
256 *
257 *
258 * @return the builder
259 */
260 public Type inOnly() {
261 return setExchangePattern(ExchangePattern.InOnly);
262 }
263
264 /**
265 * Sends the message to the given endpoint using an
266 * <a href="http://activemq.apache.org/camel/event-message.html">Event Message</a> or
267 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOnly exchange pattern</a>
268 *
269 * @param uri The endpoint uri which is used for sending the exchange
270 * @return the builder
271 */
272 public Type inOnly(String uri) {
273 return to(ExchangePattern.InOnly, uri);
274 }
275
276 /**
277 * Sends the message to the given endpoint using an
278 * <a href="http://activemq.apache.org/camel/event-message.html">Event Message</a> or
279 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOnly exchange pattern</a>
280 *
281 * @param endpoint The endpoint which is used for sending the exchange
282 * @return the builder
283 */
284 public Type inOnly(Endpoint endpoint) {
285 return to(ExchangePattern.InOnly, endpoint);
286 }
287
288
289 /**
290 * Sends the message to the given endpoints using an
291 * <a href="http://activemq.apache.org/camel/event-message.html">Event Message</a> or
292 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOnly exchange pattern</a>
293 *
294 * @param uris list of endpoints to send to
295 * @return the builder
296 */
297 public Type inOnly(String... uris) {
298 return to(ExchangePattern.InOnly, uris);
299 }
300
301
302 /**
303 * Sends the message to the given endpoints using an
304 * <a href="http://activemq.apache.org/camel/event-message.html">Event Message</a> or
305 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOnly exchange pattern</a>
306 *
307 * @param endpoints list of endpoints to send to
308 * @return the builder
309 */
310 public Type inOnly(Endpoint... endpoints) {
311 return to(ExchangePattern.InOnly, endpoints);
312 }
313
314 /**
315 * Sends the message to the given endpoints using an
316 * <a href="http://activemq.apache.org/camel/event-message.html">Event Message</a> or
317 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOnly exchange pattern</a>
318 *
319 * @param endpoints list of endpoints to send to
320 * @return the builder
321 */
322 public Type inOnly(Iterable<Endpoint> endpoints) {
323 return to(ExchangePattern.InOnly, endpoints);
324 }
325
326
327 /**
328 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">ExchangePattern:</a>
329 * set the exchange's ExchangePattern {@link ExchangePattern} to be InOut
330 *
331 *
332 * @return the builder
333 */
334 public Type inOut() {
335 return setExchangePattern(ExchangePattern.InOut);
336 }
337
338 /**
339 * Sends the message to the given endpoint using an
340 * <a href="http://activemq.apache.org/camel/request-reply.html">Request Reply</a> or
341 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOut exchange pattern</a>
342 *
343 * @param uri The endpoint uri which is used for sending the exchange
344 * @return the builder
345 */
346 public Type inOut(String uri) {
347 return to(ExchangePattern.InOut, uri);
348 }
349
350
351 /**
352 * Sends the message to the given endpoint using an
353 * <a href="http://activemq.apache.org/camel/request-reply.html">Request Reply</a> or
354 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOut exchange pattern</a>
355 *
356 * @param endpoint The endpoint which is used for sending the exchange
357 * @return the builder
358 */
359 public Type inOut(Endpoint endpoint) {
360 return to(ExchangePattern.InOut, endpoint);
361 }
362
363 /**
364 * Sends the message to the given endpoints using an
365 * <a href="http://activemq.apache.org/camel/request-reply.html">Request Reply</a> or
366 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOut exchange pattern</a>
367 *
368 * @param uris list of endpoints to send to
369 * @return the builder
370 */
371 public Type inOut(String... uris) {
372 return to(ExchangePattern.InOut, uris);
373 }
374
375
376 /**
377 * Sends the message to the given endpoints using an
378 * <a href="http://activemq.apache.org/camel/request-reply.html">Request Reply</a> or
379 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOut exchange pattern</a>
380 *
381 * @param endpoints list of endpoints to send to
382 * @return the builder
383 */
384 public Type inOut(Endpoint... endpoints) {
385 return to(ExchangePattern.InOut, endpoints);
386 }
387
388 /**
389 * Sends the message to the given endpoints using an
390 * <a href="http://activemq.apache.org/camel/request-reply.html">Request Reply</a> or
391 * <a href="http://activemq.apache.org/camel/exchange-pattern.html">InOut exchange pattern</a>
392 *
393 * @param endpoints list of endpoints to send to
394 * @return the builder
395 */
396 public Type inOut(Iterable<Endpoint> endpoints) {
397 return to(ExchangePattern.InOut, endpoints);
398 }
399
400
401 /**
402 * <a href="http://activemq.apache.org/camel/multicast.html">Multicast EIP:</a>
403 * Multicasts messages to all its child outputs; so that each processor and
404 * destination gets a copy of the original message to avoid the processors
405 * interfering with each other.
406 */
407 public MulticastType multicast() {
408 MulticastType answer = new MulticastType();
409 addOutput(answer);
410 return answer;
411 }
412
413 /**
414 * Multicasts messages to all its child outputs; so that each processor and
415 * destination gets a copy of the original message to avoid the processors
416 * interfering with each other.
417 * @param aggregationStrategy the strategy used to aggregate responses for
418 * every part
419 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
420 * @return the multicast type
421 */
422 public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
423 MulticastType answer = new MulticastType();
424 addOutput(answer);
425 answer.setAggregationStrategy(aggregationStrategy);
426 answer.setParallelProcessing(parallelProcessing);
427 return answer;
428 }
429
430 /**
431 * Multicasts messages to all its child outputs; so that each processor and
432 * destination gets a copy of the original message to avoid the processors
433 * interfering with each other.
434 * @param aggregationStrategy the strategy used to aggregate responses for
435 * every part
436 * @return the multicast type
437 */
438 public MulticastType multicast(AggregationStrategy aggregationStrategy) {
439 MulticastType answer = new MulticastType();
440 addOutput(answer);
441 answer.setAggregationStrategy(aggregationStrategy);
442 return answer;
443 }
444
445 /**
446 * Creates a {@link Pipeline} of the list of endpoints so that the message
447 * will get processed by each endpoint in turn and for request/response the
448 * output of one endpoint will be the input of the next endpoint
449 */
450 public Type pipeline(String... uris) {
451 // TODO pipeline v mulicast
452 return to(uris);
453 }
454
455 /**
456 * Creates a {@link Pipeline} of the list of endpoints so that the message
457 * will get processed by each endpoint in turn and for request/response the
458 * output of one endpoint will be the input of the next endpoint
459 */
460 public Type pipeline(Endpoint... endpoints) {
461 // TODO pipeline v mulicast
462 return to(endpoints);
463 }
464
465 /**
466 * Creates a {@link Pipeline} of the list of endpoints so that the message
467 * will get processed by each endpoint in turn and for request/response the
468 * output of one endpoint will be the input of the next endpoint
469 */
470 public Type pipeline(Collection<Endpoint> endpoints) {
471 // TODO pipeline v mulicast
472 return to(endpoints);
473 }
474
475 /**
476 * Ends the current block
477 */
478 public ProcessorType<? extends ProcessorType> end() {
479 if (blocks.isEmpty()) {
480 if (parent == null) {
481 throw new IllegalArgumentException("Root node with no active block");
482 }
483 return parent;
484 }
485 popBlock();
486 return this;
487 }
488
489 /**
490 * Causes subsequent processors to be called asynchronously
491 *
492 * @param coreSize the number of threads that will be used to process
493 * messages in subsequent processors.
494 * @return a ThreadType builder that can be used to further configure the
495 * the thread pool.
496 */
497 public ThreadType thread(int coreSize) {
498 ThreadType answer = new ThreadType(coreSize);
499 addOutput(answer);
500 return answer;
501 }
502
503 /**
504 * Causes subsequent processors to be called asynchronously
505 *
506 * @param executor the executor that will be used to process
507 * messages in subsequent processors.
508 * @return a ThreadType builder that can be used to further configure the
509 * the thread pool.
510 */
511 public ProcessorType<Type> thread(ThreadPoolExecutor executor) {
512 ThreadType answer = new ThreadType(executor);
513 addOutput(answer);
514 return this;
515 }
516
517 /**
518 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer}
519 * to avoid duplicate messages
520 */
521 public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression,
522 MessageIdRepository messageIdRepository) {
523 IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository);
524 addOutput(answer);
525 return answer;
526 }
527
528 /**
529 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer}
530 * to avoid duplicate messages
531 *
532 * @return the builder used to create the expression
533 */
534 public ExpressionClause<IdempotentConsumerType> idempotentConsumer(MessageIdRepository messageIdRepository) {
535 IdempotentConsumerType answer = new IdempotentConsumerType();
536 answer.setMessageIdRepository(messageIdRepository);
537 addOutput(answer);
538 return ExpressionClause.createAndSetExpression(answer);
539 }
540
541 /**
542 * Creates a predicate expression which only if it is true then the
543 * exchange is forwarded to the destination
544 *
545 * @return the clause used to create the filter expression
546 */
547 public ExpressionClause<FilterType> filter() {
548 FilterType filter = new FilterType();
549 addOutput(filter);
550 return ExpressionClause.createAndSetExpression(filter);
551 }
552
553 /**
554 * Creates a predicate which is applied and only if it is true then the
555 * exchange is forwarded to the destination
556 *
557 * @return the builder for a predicate
558 */
559 public FilterType filter(Predicate predicate) {
560 FilterType filter = new FilterType(predicate);
561 addOutput(filter);
562 return filter;
563 }
564
565 public FilterType filter(ExpressionType expression) {
566 FilterType filter = getNodeFactory().createFilter();
567 filter.setExpression(expression);
568 addOutput(filter);
569 return filter;
570 }
571
572 public FilterType filter(String language, String expression) {
573 return filter(new LanguageExpression(language, expression));
574 }
575
576 public LoadBalanceType loadBalance() {
577 LoadBalanceType answer = new LoadBalanceType();
578 addOutput(answer);
579 return answer;
580 }
581
582
583 /**
584 * Creates a choice of one or more predicates with an otherwise clause
585 *
586 * @return the builder for a choice expression
587 */
588 public ChoiceType choice() {
589 ChoiceType answer = new ChoiceType();
590 addOutput(answer);
591 return answer;
592 }
593
594 /**
595 * Creates a try/catch block
596 *
597 * @return the builder for a tryBlock expression
598 */
599 public TryType tryBlock() {
600 TryType answer = new TryType();
601 addOutput(answer);
602 return answer;
603 }
604
605 /**
606 * Creates a dynamic <a
607 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
608 * List</a> pattern.
609 *
610 * @param recipients is the builder of the expression used in the
611 * {@link org.apache.camel.processor.RecipientList}
612 * to decide the destinations
613 */
614 public Type recipientList(Expression recipients) {
615 RecipientListType answer = new RecipientListType(recipients);
616 addOutput(answer);
617 return (Type) this;
618 }
619
620 /**
621 * Creates a dynamic <a
622 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
623 * List</a> pattern.
624 *
625 * @return the expression clause for the expression used in the
626 * {@link org.apache.camel.processor.RecipientList}
627 * to decide the destinations
628 */
629 public ExpressionClause<ProcessorType<Type>> recipientList() {
630 RecipientListType answer = new RecipientListType();
631 addOutput(answer);
632 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
633 answer.setExpression(clause);
634 return clause;
635 }
636
637 /**
638 * Creates a <a
639 * href="http://activemq.apache.org/camel/routing-slip.html">Routing
640 * Slip</a> pattern.
641 *
642 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
643 * class will look in for the list of URIs to route the message to.
644 * @param uriDelimiter is the delimiter that will be used to split up
645 * the list of URIs in the routing slip.
646 */
647 public Type routingSlip(String header, String uriDelimiter) {
648 RoutingSlipType answer = new RoutingSlipType(header, uriDelimiter);
649 addOutput(answer);
650 return (Type) this;
651 }
652
653 /**
654 * Creates a <a
655 * href="http://activemq.apache.org/camel/routing-slip.html">Routing
656 * Slip</a> pattern.
657 *
658 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
659 * class will look in for the list of URIs to route the message to. The list of URIs
660 * will be split based on the default delimiter
661 * {@link RoutingSlipType#DEFAULT_DELIMITER}.
662 */
663 public Type routingSlip(String header) {
664 RoutingSlipType answer = new RoutingSlipType(header);
665 addOutput(answer);
666 return (Type) this;
667 }
668
669 /**
670 * Creates a <a
671 * href="http://activemq.apache.org/camel/routing-slip.html">Routing
672 * Slip</a> pattern with the default header {@link RoutingSlipType#ROUTING_SLIP_HEADER}.
673 * The list of URIs in the header will be split based on the default delimiter
674 * {@link RoutingSlipType#DEFAULT_DELIMITER}.
675 */
676 public Type routingSlip() {
677 RoutingSlipType answer = new RoutingSlipType();
678 addOutput(answer);
679 return (Type) this;
680 }
681
682 /**
683 * Creates the <a
684 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
685 * pattern where an expression is evaluated to iterate through each of the
686 * parts of a message and then each part is then send to some endpoint.
687 * This splitter responds with the latest message returned from destination
688 * endpoint.
689 *
690 * @param recipients the expression on which to split
691 * @return the builder
692 */
693 public SplitterType splitter(Expression recipients) {
694 SplitterType answer = new SplitterType(recipients);
695 addOutput(answer);
696 return answer;
697 }
698
699 /**
700 * Creates the <a
701 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
702 * pattern where an expression is evaluated to iterate through each of the
703 * parts of a message and then each part is then send to some endpoint.
704 * This splitter responds with the latest message returned from destination
705 * endpoint.
706 *
707 * @return the expression clause for the expression on which to split
708 */
709 public ExpressionClause<SplitterType> splitter() {
710 SplitterType answer = new SplitterType();
711 addOutput(answer);
712 return ExpressionClause.createAndSetExpression(answer);
713 }
714
715 /**
716 * Creates the <a
717 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
718 * pattern where an expression is evaluated to iterate through each of the
719 * parts of a message and then each part is then send to some endpoint.
720 * Answer from the splitter is produced using given {@link AggregationStrategy}
721 * @param partsExpression the expression on which to split
722 * @param aggregationStrategy the strategy used to aggregate responses for
723 * every part
724 * @return the builder
725 */
726 public SplitterType splitter(Expression partsExpression, AggregationStrategy aggregationStrategy) {
727 SplitterType answer = new SplitterType(partsExpression);
728 addOutput(answer);
729 answer.setAggregationStrategy(aggregationStrategy);
730 return answer;
731 }
732
733 /**
734 * Creates the <a
735 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
736 * pattern where an expression is evaluated to iterate through each of the
737 * parts of a message and then each part is then send to some endpoint.
738 * Answer from the splitter is produced using given {@link AggregationStrategy}
739 * @param aggregationStrategy the strategy used to aggregate responses for
740 * every part
741 * @return the expression clause for the expression on which to split
742 */
743 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy) {
744 SplitterType answer = new SplitterType();
745 addOutput(answer);
746 answer.setAggregationStrategy(aggregationStrategy);
747 return ExpressionClause.createAndSetExpression(answer);
748 }
749
750 /**
751 * Creates the <a
752 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
753 * pattern where an expression is evaluated to iterate through each of the
754 * parts of a message and then each part is then send to some endpoint.
755 * This splitter responds with the latest message returned from destination
756 * endpoint.
757 *
758 * @param recipients the expression on which to split
759 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
760 * @return the builder
761 */
762 public SplitterType splitter(Expression recipients, boolean parallelProcessing) {
763 SplitterType answer = new SplitterType(recipients);
764 addOutput(answer);
765 answer.setParallelProcessing(parallelProcessing);
766 return answer;
767 }
768
769 /**
770 * Creates the <a
771 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
772 * pattern where an expression is evaluated to iterate through each of the
773 * parts of a message and then each part is then send to some endpoint.
774 * This splitter responds with the latest message returned from destination
775 * endpoint.
776 *
777 * @param recipients the expression on which to split
778 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
779 * @param threadPoolExecutor override the default {@link ThreadPoolExecutor}
780 * @return the builder
781 */
782 public SplitterType splitter(Expression recipients, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
783 SplitterType answer = new SplitterType(recipients);
784 addOutput(answer);
785 answer.setParallelProcessing(parallelProcessing);
786 answer.setThreadPoolExecutor(threadPoolExecutor);
787 return answer;
788 }
789
790 /**
791 * Creates the <a
792 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
793 * pattern where an expression is evaluated to iterate through each of the
794 * parts of a message and then each part is then send to some endpoint.
795 * This splitter responds with the latest message returned from destination
796 * endpoint.
797 *
798 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
799 * @return the expression clause for the expression on which to split
800 */
801 public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) {
802 SplitterType answer = new SplitterType();
803 addOutput(answer);
804 answer.setParallelProcessing(parallelProcessing);
805 return ExpressionClause.createAndSetExpression(answer);
806 }
807
808 /**
809 * Creates the <a
810 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
811 * pattern where an expression is evaluated to iterate through each of the
812 * parts of a message and then each part is then send to some endpoint.
813 * This splitter responds with the latest message returned from destination
814 * endpoint.
815 *
816 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
817 * @param threadPoolExecutor override the default {@link ThreadPoolExecutor}
818 * @return the expression clause for the expression on which to split
819 */
820 public ExpressionClause<SplitterType> splitter(boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
821 SplitterType answer = new SplitterType();
822 addOutput(answer);
823 answer.setParallelProcessing(parallelProcessing);
824 answer.setThreadPoolExecutor(threadPoolExecutor);
825 return ExpressionClause.createAndSetExpression(answer);
826 }
827
828 /**
829 * Creates the <a
830 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
831 * pattern where an expression is evaluated to iterate through each of the
832 * parts of a message and then each part is then send to some endpoint.
833 * Answer from the splitter is produced using given {@link AggregationStrategy}
834 * @param partsExpression the expression on which to split
835 * @param aggregationStrategy the strategy used to aggregate responses for
836 * every part
837 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
838 * @return the builder
839 */
840 public SplitterType splitter(Expression partsExpression,
841 AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
842 SplitterType answer = new SplitterType(partsExpression);
843 addOutput(answer);
844 answer.setAggregationStrategy(aggregationStrategy);
845 answer.setParallelProcessing(parallelProcessing);
846 return answer;
847 }
848
849 /**
850 * Creates the <a
851 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
852 * pattern where an expression is evaluated to iterate through each of the
853 * parts of a message and then each part is then send to some endpoint.
854 * Answer from the splitter is produced using given {@link AggregationStrategy}
855 * @param partsExpression the expression on which to split
856 * @param aggregationStrategy the strategy used to aggregate responses for
857 * every part
858 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
859 * @param threadPoolExecutor override the default {@link ThreadPoolExecutor}
860 * @return the builder
861 */
862 public SplitterType splitter(Expression partsExpression,
863 AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
864 SplitterType answer = new SplitterType(partsExpression);
865 addOutput(answer);
866 answer.setAggregationStrategy(aggregationStrategy);
867 answer.setParallelProcessing(parallelProcessing);
868 answer.setThreadPoolExecutor(threadPoolExecutor);
869 return answer;
870 }
871
872 /**
873 * Creates the <a
874 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
875 * pattern where an expression is evaluated to iterate through each of the
876 * parts of a message and then each part is then send to some endpoint.
877 * Answer from the splitter is produced using given {@link AggregationStrategy}
878 * @param aggregationStrategy the strategy used to aggregate responses for
879 * every part
880 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
881 * @return the expression clause for the expression on which to split
882 */
883 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
884 SplitterType answer = new SplitterType();
885 addOutput(answer);
886 answer.setAggregationStrategy(aggregationStrategy);
887 answer.setParallelProcessing(parallelProcessing);
888 return ExpressionClause.createAndSetExpression(answer);
889 }
890
891 /**
892 * Creates the <a
893 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
894 * pattern where an expression is evaluated to iterate through each of the
895 * parts of a message and then each part is then send to some endpoint.
896 * Answer from the splitter is produced using given {@link AggregationStrategy}
897 * @param aggregationStrategy the strategy used to aggregate responses for
898 * every part
899 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
900 * @param threadPoolExecutor override the default {@link ThreadPoolExecutor}
901 * @return the expression clause for the expression on which to split
902 */
903 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
904 SplitterType answer = new SplitterType();
905 addOutput(answer);
906 answer.setAggregationStrategy(aggregationStrategy);
907 answer.setParallelProcessing(parallelProcessing);
908 answer.setThreadPoolExecutor(threadPoolExecutor);
909 return ExpressionClause.createAndSetExpression(answer);
910 }
911
912 /**
913 * Creates the <a
914 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
915 * pattern where a list of expressions are evaluated to be able to compare
916 * the message exchanges to reorder them. e.g. you may wish to sort by some
917 * headers
918 *
919 * @return the expression clause for the expressions on which to compare messages in order
920 */
921 public ExpressionClause<ResequencerType> resequencer() {
922 ResequencerType answer = new ResequencerType();
923 addOutput(answer);
924 ExpressionClause<ResequencerType> clause = new ExpressionClause<ResequencerType>(answer);
925 answer.expression(clause);
926 return clause;
927 }
928
929 /**
930 * Creates the <a
931 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
932 * pattern where an expression is evaluated to be able to compare the
933 * message exchanges to reorder them. e.g. you may wish to sort by some
934 * header
935 *
936 * @param expression the expression on which to compare messages in order
937 * @return the builder
938 */
939 public ResequencerType resequencer(Expression<Exchange> expression) {
940 return resequencer(Collections.<Expression>singletonList(expression));
941 }
942
943 /**
944 * Creates the <a
945 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
946 * pattern where a list of expressions are evaluated to be able to compare
947 * the message exchanges to reorder them. e.g. you may wish to sort by some
948 * headers
949 *
950 * @param expressions the expressions on which to compare messages in order
951 * @return the builder
952 */
953 public ResequencerType resequencer(List<Expression> expressions) {
954 ResequencerType answer = new ResequencerType(expressions);
955 addOutput(answer);
956 return answer;
957 }
958
959 /**
960 * Creates the <a
961 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
962 * pattern where a list of expressions are evaluated to be able to compare
963 * the message exchanges to reorder them. e.g. you may wish to sort by some
964 * headers
965 *
966 * @param expressions the expressions on which to compare messages in order
967 * @return the builder
968 */
969 public ResequencerType resequencer(Expression... expressions) {
970 List<Expression> list = new ArrayList<Expression>();
971 list.addAll(Arrays.asList(expressions));
972 return resequencer(list);
973 }
974
975 /**
976 * Creates an <a
977 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
978 * pattern where a batch of messages are processed (up to a maximum amount
979 * or until some timeout is reached) and messages for the same correlation
980 * key are combined together using some kind of {@link AggregationStrategy}
981 * (by default the latest message is used) to compress many message exchanges
982 * into a smaller number of exchanges.
983 * <p/>
984 * A good example of this is stock market data; you may be receiving 30,000
985 * messages/second and you may want to throttle it right down so that multiple
986 * messages for the same stock are combined (or just the latest message is used
987 * and older prices are discarded). Another idea is to combine line item messages
988 * together into a single invoice message.
989 */
990 public ExpressionClause<AggregatorType> aggregator() {
991 AggregatorType answer = new AggregatorType();
992 addOutput(answer);
993 return ExpressionClause.createAndSetExpression(answer);
994 }
995
996 /**
997 * Creates an <a
998 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
999 * pattern where a batch of messages are processed (up to a maximum amount
1000 * or until some timeout is reached) and messages for the same correlation
1001 * key are combined together using some kind of {@link AggregationStrategy}
1002 * (by default the latest message is used) to compress many message exchanges
1003 * into a smaller number of exchanges.
1004 * <p/>
1005 * A good example of this is stock market data; you may be receiving 30,000
1006 * messages/second and you may want to throttle it right down so that multiple
1007 * messages for the same stock are combined (or just the latest message is used
1008 * and older prices are discarded). Another idea is to combine line item messages
1009 * together into a single invoice message.
1010 *
1011 * @param aggregationStrategy the strategy used for the aggregation
1012 */
1013 public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy) {
1014 AggregatorType answer = new AggregatorType();
1015 answer.setAggregationStrategy(aggregationStrategy);
1016 addOutput(answer);
1017 return ExpressionClause.createAndSetExpression(answer);
1018 }
1019
1020 /**
1021 * Creates an <a
1022 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
1023 * pattern using a custom aggregation collection implementation. The aggregation collection must
1024 * be configued with the strategy and correlation expression that this aggregator should use.
1025 * This avoids duplicating this configuration on both the collection and the aggregator itself.
1026 *
1027 * @param aggregationCollection the collection used to perform the aggregation
1028 */
1029 public AggregatorType aggregator(AggregationCollection aggregationCollection) {
1030 AggregatorType answer = new AggregatorType();
1031 answer.setAggregationCollection(aggregationCollection);
1032 addOutput(answer);
1033 return answer;
1034 }
1035
1036 /**
1037 * Creates an <a
1038 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
1039 * pattern where a batch of messages are processed (up to a maximum amount
1040 * or until some timeout is reached) and messages for the same correlation
1041 * key are combined together using some kind of {@link AggregationStrategy}
1042 * (by default the latest message is used) to compress many message exchanges
1043 * into a smaller number of exchanges.
1044 * <p/>
1045 * A good example of this is stock market data; you may be receiving 30,000
1046 * messages/second and you may want to throttle it right down so that multiple
1047 * messages for the same stock are combined (or just the latest message is used
1048 * and older prices are discarded). Another idea is to combine line item messages
1049 * together into a single invoice message.
1050 *
1051 * @param correlationExpression the expression used to calculate the
1052 * correlation key. For a JMS message this could be the
1053 * expression <code>header("JMSDestination")</code> or
1054 * <code>header("JMSCorrelationID")</code>
1055 */
1056 public AggregatorType aggregator(Expression correlationExpression) {
1057 AggregatorType answer = new AggregatorType(correlationExpression);
1058 addOutput(answer);
1059 return answer;
1060 }
1061
1062 /**
1063 * Creates an <a
1064 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
1065 * pattern where a batch of messages are processed (up to a maximum amount
1066 * or until some timeout is reached) and messages for the same correlation
1067 * key are combined together using some kind of {@link AggregationStrategy}
1068 * (by default the latest message is used) to compress many message exchanges
1069 * into a smaller number of exchanges.
1070 * <p/>
1071 * A good example of this is stock market data; you may be receiving 30,000
1072 * messages/second and you may want to throttle it right down so that multiple
1073 * messages for the same stock are combined (or just the latest message is used
1074 * and older prices are discarded). Another idea is to combine line item messages
1075 * together into a single invoice message.
1076 *
1077 * @param correlationExpression the expression used to calculate the
1078 * correlation key. For a JMS message this could be the
1079 * expression <code>header("JMSDestination")</code> or
1080 * <code>header("JMSCorrelationID")</code>
1081 */
1082 public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
1083 AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
1084 addOutput(answer);
1085 return answer;
1086 }
1087
1088 /**
1089 * Creates the <a
1090 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
1091 * where an expression is used to calculate the time which the message will
1092 * be dispatched on
1093 *
1094 * @param processAtExpression an expression to calculate the time at which
1095 * the messages should be processed
1096 * @return the builder
1097 */
1098 public DelayerType delayer(Expression<Exchange> processAtExpression) {
1099 return delayer(processAtExpression, 0L);
1100 }
1101
1102 /**
1103 * Creates the <a
1104 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
1105 * where an expression is used to calculate the time which the message will
1106 * be dispatched on
1107 *
1108 * @param processAtExpression an expression to calculate the time at which
1109 * the messages should be processed
1110 * @param delay the delay in milliseconds which is added to the
1111 * processAtExpression to determine the time the message
1112 * should be processed
1113 * @return the builder
1114 */
1115 public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) {
1116 DelayerType answer = new DelayerType(processAtExpression, delay);
1117 addOutput(answer);
1118 return answer;
1119 }
1120
1121 /**
1122 * Creates the <a
1123 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
1124 * where an expression is used to calculate the time which the message will
1125 * be dispatched on
1126 * @return the expression clause to create the expression
1127 */
1128 public ExpressionClause<DelayerType> delayer() {
1129 DelayerType answer = new DelayerType();
1130 addOutput(answer);
1131 return ExpressionClause.createAndSetExpression(answer);
1132 }
1133
1134 /**
1135 * Creates the <a
1136 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
1137 * where a fixed amount of milliseconds are used to delay processing of a
1138 * message exchange
1139 *
1140 * @param delay the default delay in milliseconds
1141 * @return the builder
1142 */
1143 public DelayerType delayer(long delay) {
1144 return delayer(null, delay);
1145 }
1146
1147 /**
1148 * Creates the <a
1149 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
1150 * where an expression is used to calculate the time which the message will
1151 * be dispatched on
1152 *
1153 * @return the builder
1154 */
1155 public ThrottlerType throttler(long maximumRequestCount) {
1156 ThrottlerType answer = new ThrottlerType(maximumRequestCount);
1157 addOutput(answer);
1158 return answer;
1159 }
1160
1161 /**
1162 * Creates a expression which must evaluate to an integer that determines
1163 * how many times the exchange should be sent down the rest of the route.
1164 *
1165 * @return the clause used to create the loop expression
1166 */
1167 public ExpressionClause<LoopType> loop() {
1168 LoopType loop = new LoopType();
1169 addOutput(loop);
1170 return ExpressionClause.createAndSetExpression(loop);
1171 }
1172
1173 public LoopType loop(Expression<?> expression) {
1174 LoopType loop = getNodeFactory().createLoop();
1175 loop.setExpression(expression);
1176 addOutput(loop);
1177 return loop;
1178 }
1179
1180 public LoopType loop(int count) {
1181 LoopType loop = getNodeFactory().createLoop();
1182 loop.setExpression(new ConstantExpression(Integer.toString(count)));
1183 addOutput(loop);
1184 return loop;
1185 }
1186
1187 /**
1188 * @deprecated will be removed in Camel 2.0
1189 */
1190 public Type throwFault(Throwable fault) {
1191 ThrowFaultType answer = new ThrowFaultType();
1192 answer.setFault(fault);
1193 addOutput(answer);
1194 return (Type) this;
1195 }
1196
1197 /**
1198 * @deprecated will be removed in Camel 2.0
1199 */
1200 public Type throwFault(String message) {
1201 return throwFault(new CamelException(message));
1202 }
1203
1204 /**
1205 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1206 *
1207 * @deprecated will be removed in Camel 2.0
1208 */
1209 public Type interceptor(String ref) {
1210 InterceptorRef interceptor = new InterceptorRef(ref);
1211 intercept(interceptor);
1212 return (Type) this;
1213 }
1214
1215 /**
1216 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1217 *
1218 * @deprecated will be removed in Camel 2.0
1219 */
1220 public Type intercept(DelegateProcessor interceptor) {
1221 intercept(new InterceptorRef(interceptor));
1222 //lastInterceptor = interceptor;
1223 return (Type) this;
1224 }
1225
1226 /**
1227 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1228 */
1229 public InterceptType intercept() {
1230 InterceptType answer = new InterceptType();
1231 addOutput(answer);
1232 return answer;
1233 }
1234
1235 /**
1236 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
1237 *
1238 * @deprecated will be removed in Camel 2.0
1239 */
1240 public void intercept(InterceptorType interceptor) {
1241 addOutput(interceptor);
1242 pushBlock(interceptor);
1243 }
1244
1245 /**
1246 * Adds an interceptor around the whole of this nodes processing
1247 *
1248 * @param interceptor
1249 * @deprecated will be removed in Camel 2.0
1250 */
1251 public void addInterceptor(InterceptorType interceptor) {
1252 interceptors.add(interceptor);
1253 }
1254
1255 /**
1256 * Adds an interceptor around the whole of this nodes processing
1257 *
1258 * @param interceptor
1259 * @deprecated will be removed in Camel 2.0
1260 */
1261 public void addInterceptor(DelegateProcessor interceptor) {
1262 addInterceptor(new InterceptorRef(interceptor));
1263 }
1264
1265 public void pushBlock(Block block) {
1266 blocks.add(block);
1267 }
1268
1269 public Block popBlock() {
1270 return blocks.isEmpty() ? null : blocks.removeLast();
1271 }
1272
1273 public Type proceed() {
1274 ProceedType proceed = null;
1275 ProcessorType currentProcessor = this;
1276
1277 if (currentProcessor instanceof InterceptType) {
1278 proceed = ((InterceptType) currentProcessor).getProceed();
1279 LOG.info("proceed() is the implied and hence not needed for an intercept()");
1280 }
1281 if (proceed == null) {
1282 for (ProcessorType node = parent; node != null; node = node.getParent()) {
1283 if (node instanceof InterceptType) {
1284 InterceptType intercept = (InterceptType)node;
1285 proceed = intercept.getProceed();
1286 break;
1287 }
1288 }
1289
1290 if (proceed == null) {
1291 throw new IllegalArgumentException("Cannot use proceed() without being within an intercept() block");
1292 }
1293
1294 }
1295
1296 addOutput(proceed);
1297 return (Type) this;
1298 }
1299
1300 public Type stop() {
1301 ProcessorType currentProcessor = this;
1302
1303 if (currentProcessor instanceof InterceptType) {
1304 ((InterceptType) currentProcessor).stopIntercept();
1305 } else {
1306 ProcessorType node;
1307 for (node = parent; node != null; node = node.getParent()) {
1308 if (node instanceof InterceptType) {
1309 ((InterceptType) node).stopIntercept();
1310 break;
1311 }
1312 }
1313 if (node == null) {
1314 throw new IllegalArgumentException("Cannot use stop() without being within an intercept() block");
1315 }
1316 }
1317
1318 return (Type) this;
1319 }
1320
1321 /**
1322 * Catches an exception type.
1323 *
1324 * @deprecated Please use {@link #onException(Class)} instead. Will be removed in Camel 2.0.
1325 */
1326 public ExceptionType exception(Class exceptionType) {
1327 return onException(exceptionType);
1328 }
1329
1330 /**
1331 * Catches an exception type.
1332 */
1333 public ExceptionType onException(Class exceptionType) {
1334 ExceptionType answer = new ExceptionType(exceptionType);
1335 addOutput(answer);
1336 return answer;
1337 }
1338
1339 /**
1340 * Apply an interceptor route if the predicate is true
1341 */
1342 public ChoiceType intercept(Predicate predicate) {
1343 InterceptType answer = new InterceptType();
1344 addOutput(answer);
1345 return answer.when(predicate);
1346 }
1347
1348 /**
1349 * @deprecated will be removed in Camel 2.0
1350 */
1351 public Type interceptors(String... refs) {
1352 for (String ref : refs) {
1353 interceptor(ref);
1354 }
1355 return (Type) this;
1356 }
1357
1358 /**
1359 * Trace logs the exchange before it goes to the next processing step using
1360 * the {@link #DEFAULT_TRACE_CATEGORY} logging category.
1361 *
1362 * @deprecated Please use <a href="http://activemq.apache.org/camel/tracer.html>Tracer Support</a>
1363 * instead. Will be removed in Camel 2.0.
1364 */
1365 public Type trace() {
1366 return trace(DEFAULT_TRACE_CATEGORY);
1367 }
1368
1369 /**
1370 * Trace logs the exchange before it goes to the next processing step using
1371 * the specified logging category.
1372 *
1373 * @param category the logging category trace messages will sent to.
1374 *
1375 * @deprecated Please use <a href="http://activemq.apache.org/camel/tracer.html>Tracer Support</a>
1376 * instead. Will be removed in Camel 2.0.
1377 */
1378 public Type trace(String category) {
1379 final Log log = LogFactory.getLog(category);
1380 return intercept(new DelegateProcessor() {
1381 @Override
1382 public void process(Exchange exchange) throws Exception {
1383 log.trace(exchange);
1384 processNext(exchange);
1385 }
1386 });
1387 }
1388
1389 public PolicyRef policies() {
1390 PolicyRef answer = new PolicyRef();
1391 addOutput(answer);
1392 return answer;
1393 }
1394
1395 public PolicyRef policy(Policy policy) {
1396 PolicyRef answer = new PolicyRef(policy);
1397 addOutput(answer);
1398 return answer;
1399 }
1400
1401 /**
1402 * Forces handling of faults as exceptions
1403 *
1404 * @return the current builder with the fault handler configured
1405 */
1406 public Type handleFault() {
1407 intercept(new HandleFaultType());
1408 return (Type) this;
1409 }
1410
1411 /**
1412 * Installs the given error handler builder
1413 *
1414 * @param errorHandlerBuilder the error handler to be used by default for
1415 * all child routes
1416 * @return the current builder with the error handler configured
1417 */
1418 public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
1419 setErrorHandlerBuilder(errorHandlerBuilder);
1420 return (Type) this;
1421 }
1422
1423 /**
1424 * Configures whether or not the error handler is inherited by every
1425 * processing node (or just the top most one)
1426 *
1427 * @param condition the flag as to whether error handlers should be
1428 * inherited or not
1429 * @return the current builder
1430 */
1431 public Type inheritErrorHandler(boolean condition) {
1432 setInheritErrorHandlerFlag(condition);
1433 return (Type) this;
1434 }
1435
1436 // Transformers
1437 // -------------------------------------------------------------------------
1438
1439 /**
1440 * Adds the custom processor to this destination which could be a final
1441 * destination, or could be a transformation in a pipeline
1442 */
1443 public Type process(Processor processor) {
1444 ProcessorRef answer = new ProcessorRef(processor);
1445 addOutput(answer);
1446 return (Type) this;
1447 }
1448
1449 /**
1450 * Adds the custom processor reference to this destination which could be a final
1451 * destination, or could be a transformation in a pipeline
1452 */
1453 public Type processRef(String ref) {
1454 ProcessorRef answer = new ProcessorRef();
1455 answer.setRef(ref);
1456 addOutput(answer);
1457 return (Type) this;
1458 }
1459
1460 /**
1461 * Adds a bean which is invoked which could be a final destination, or could
1462 * be a transformation in a pipeline
1463 */
1464 public Type bean(Object bean) {
1465 BeanRef answer = new BeanRef();
1466 answer.setBean(bean);
1467 addOutput(answer);
1468 return (Type) this;
1469 }
1470
1471 /**
1472 * Adds a bean and method which is invoked which could be a final
1473 * destination, or could be a transformation in a pipeline
1474 */
1475 public Type bean(Object bean, String method) {
1476 BeanRef answer = new BeanRef();
1477 answer.setBean(bean);
1478 answer.setMethod(method);
1479 addOutput(answer);
1480 return (Type) this;
1481 }
1482
1483 /**
1484 * Adds a bean by type which is invoked which could be a final destination, or could
1485 * be a transformation in a pipeline
1486 */
1487 public Type bean(Class beanType) {
1488 BeanRef answer = new BeanRef();
1489 answer.setBeanType(beanType);
1490 addOutput(answer);
1491 return (Type) this;
1492 }
1493
1494 /**
1495 * Adds a bean type and method which is invoked which could be a final
1496 * destination, or could be a transformation in a pipeline
1497 */
1498 public Type bean(Class beanType, String method) {
1499 BeanRef answer = new BeanRef();
1500 answer.setBeanType(beanType);
1501 answer.setMethod(method);
1502 addOutput(answer);
1503 return (Type) this;
1504 }
1505
1506 /**
1507 * Adds a bean which is invoked which could be a final destination, or could
1508 * be a transformation in a pipeline
1509 */
1510 public Type beanRef(String ref) {
1511 BeanRef answer = new BeanRef(ref);
1512 addOutput(answer);
1513 return (Type) this;
1514 }
1515
1516 /**
1517 * Adds a bean and method which is invoked which could be a final
1518 * destination, or could be a transformation in a pipeline
1519 */
1520 public Type beanRef(String ref, String method) {
1521 BeanRef answer = new BeanRef(ref, method);
1522 addOutput(answer);
1523 return (Type) this;
1524 }
1525
1526 /**
1527 * Adds a processor which sets the body on the IN message
1528 */
1529 public ExpressionClause<ProcessorType<Type>> setBody() {
1530 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1531 SetBodyType answer = new SetBodyType(clause);
1532 addOutput(answer);
1533 return clause;
1534 }
1535
1536 /**
1537 * Adds a processor which sets the body on the IN message
1538 */
1539 public Type setBody(Expression expression) {
1540 SetBodyType answer = new SetBodyType(expression);
1541 addOutput(answer);
1542 return (Type) this;
1543 }
1544
1545 /**
1546 * Adds a processor which sets the body on the OUT message
1547 *
1548 * @deprecated Please use {@link #transform(Expression)} instead. Will be removed in Camel 2.0.
1549 */
1550 @Deprecated
1551 public Type setOutBody(Expression expression) {
1552 return transform(expression);
1553 }
1554
1555 /**
1556 * Adds a processor which sets the body on the OUT message
1557 *
1558 * @deprecated Please use {@link #transform()} instead. Will be removed in Camel 2.0.
1559 */
1560 @Deprecated
1561 public ExpressionClause<ProcessorType<Type>> setOutBody() {
1562 return transform();
1563 }
1564
1565 /**
1566 * Adds a processor which sets the body on the OUT message
1567 */
1568 public Type transform(Expression expression) {
1569 TransformType answer = new TransformType(expression);
1570 addOutput(answer);
1571 return (Type) this;
1572 }
1573
1574 /**
1575 * Adds a processor which sets the body on the OUT message
1576 */
1577 public ExpressionClause<ProcessorType<Type>> transform() {
1578 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1579 TransformType answer = new TransformType(clause);
1580 addOutput(answer);
1581 return clause;
1582 }
1583
1584 /**
1585 * Adds a processor which sets the body on the FAULT message
1586 */
1587 public Type setFaultBody(Expression expression) {
1588 return process(ProcessorBuilder.setFaultBody(expression));
1589 }
1590
1591 /**
1592 * Adds a processor which sets the header on the IN message
1593 */
1594 public ExpressionClause<ProcessorType<Type>> setHeader(String name) {
1595 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1596 SetHeaderType answer = new SetHeaderType(name, clause);
1597 addOutput(answer);
1598 return clause;
1599 }
1600
1601 /**
1602 * Adds a processor which sets the header on the IN message
1603 */
1604 public Type setHeader(String name, Expression expression) {
1605 SetHeaderType answer = new SetHeaderType(name, expression);
1606 addOutput(answer);
1607 return (Type) this;
1608 }
1609
1610 /**
1611 * Adds a processor which sets the header on the IN message to the given value
1612 * @deprecated Please use {@link #setHeader(String, Expression)} instead. Will be removed in Camel 2.0.
1613 */
1614 public Type setHeader(String name, String value) {
1615 SetHeaderType answer = new SetHeaderType(name, value);
1616 addOutput(answer);
1617 return (Type) this;
1618 }
1619
1620 /**
1621 * Adds a processor which sets the header on the OUT message
1622 */
1623 public ExpressionClause<ProcessorType<Type>> setOutHeader(String name) {
1624 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1625 SetOutHeaderType answer = new SetOutHeaderType(name, clause);
1626 addOutput(answer);
1627 return clause;
1628 }
1629
1630 /**
1631 * Adds a processor which sets the header on the OUT message
1632 */
1633 public Type setOutHeader(String name, Expression expression) {
1634 SetOutHeaderType answer = new SetOutHeaderType(name, expression);
1635 addOutput(answer);
1636 return (Type) this;
1637 }
1638
1639 /**
1640 * Adds a processor which sets the header on the FAULT message
1641 */
1642 public Type setFaultHeader(String name, Expression expression) {
1643 return process(ProcessorBuilder.setFaultHeader(name, expression));
1644 }
1645
1646 /**
1647 * Adds a processor which sets the exchange property
1648 */
1649 public Type setProperty(String name, Expression expression) {
1650 SetPropertyType answer = new SetPropertyType(name, expression);
1651 addOutput(answer);
1652 return (Type) this;
1653 }
1654
1655
1656 /**
1657 * Adds a processor which sets the exchange property
1658 */
1659 public ExpressionClause<ProcessorType<Type>> setProperty(String name) {
1660 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1661 SetPropertyType answer = new SetPropertyType(name, clause);
1662 addOutput(answer);
1663 return clause;
1664 }
1665
1666 /**
1667 * Adds a processor which removes the header on the IN message
1668 */
1669 public Type removeHeader(String name) {
1670 RemoveHeaderType answer = new RemoveHeaderType(name);
1671 addOutput(answer);
1672 return (Type) this;
1673 }
1674
1675 /**
1676 * Adds a processor which removes the header on the FAULT message
1677 */
1678 public Type removeFaultHeader(String name) {
1679 return process(ProcessorBuilder.removeFaultHeader(name));
1680 }
1681
1682 /**
1683 * Adds a processor which removes the exchange property
1684 */
1685 public Type removeProperty(String name) {
1686 RemovePropertyType answer = new RemovePropertyType(name);
1687 addOutput(answer);
1688 return (Type) this;
1689 }
1690
1691 /**
1692 * Converts the IN message body to the specified type
1693 */
1694 public Type convertBodyTo(Class type) {
1695 addOutput(new ConvertBodyType(type));
1696 return (Type) this;
1697 }
1698
1699 /**
1700 * Converts the IN message body to the specified class type
1701 */
1702 public Type convertBodyTo(String typeString) {
1703 addOutput(new ConvertBodyType(typeString));
1704 return (Type) this;
1705 }
1706
1707 /**
1708 * Converts the OUT message body to the specified type
1709 *
1710 * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0.
1711 */
1712 @Deprecated
1713 public Type convertOutBodyTo(Class type) {
1714 return process(new ConvertBodyProcessor(type));
1715 }
1716
1717 /**
1718 * Converts the FAULT message body to the specified type
1719 *
1720 * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0.
1721 */
1722 @Deprecated
1723 public Type convertFaultBodyTo(Class type) {
1724 return process(new ConvertBodyProcessor(type));
1725 }
1726
1727 // DataFormat support
1728 // -------------------------------------------------------------------------
1729
1730 /**
1731 * Unmarshals the in body using a {@link DataFormat} expression to define
1732 * the format of the input message and the output will be set on the out message body.
1733 *
1734 * @return the expression to create the {@link DataFormat}
1735 */
1736 public DataFormatClause<ProcessorType<Type>> unmarshal() {
1737 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Unmarshal);
1738 }
1739
1740 /**
1741 * Unmarshals the in body using the specified {@link DataFormat}
1742 * and sets the output on the out message body.
1743 *
1744 * @return this object
1745 */
1746 public Type unmarshal(DataFormatType dataFormatType) {
1747 addOutput(new UnmarshalType(dataFormatType));
1748 return (Type) this;
1749 }
1750
1751 /**
1752 * Unmarshals the in body using the specified {@link DataFormat}
1753 * and sets the output on the out message body.
1754 *
1755 * @return this object
1756 */
1757 public Type unmarshal(DataFormat dataFormat) {
1758 return unmarshal(new DataFormatType(dataFormat));
1759 }
1760
1761 /**
1762 * Unmarshals the in body using the specified {@link DataFormat}
1763 * reference in the {@link org.apache.camel.spi.Registry} and sets
1764 * the output on the out message body.
1765 *
1766 * @return this object
1767 */
1768 public Type unmarshal(String dataTypeRef) {
1769 addOutput(new UnmarshalType(dataTypeRef));
1770 return (Type) this;
1771 }
1772
1773 /**
1774 * Marshals the in body using a {@link DataFormat} expression to define
1775 * the format of the output which will be added to the out body.
1776 *
1777 * @return the expression to create the {@link DataFormat}
1778 */
1779 public DataFormatClause<ProcessorType<Type>> marshal() {
1780 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Marshal);
1781 }
1782
1783 /**
1784 * Marshals the in body using the specified {@link DataFormat}
1785 * and sets the output on the out message body.
1786 *
1787 * @return this object
1788 */
1789 public Type marshal(DataFormatType dataFormatType) {
1790 addOutput(new MarshalType(dataFormatType));
1791 return (Type) this;
1792 }
1793
1794 /**
1795 * Marshals the in body using the specified {@link DataFormat}
1796 * and sets the output on the out message body.
1797 *
1798 * @return this object
1799 */
1800 public Type marshal(DataFormat dataFormat) {
1801 return marshal(new DataFormatType(dataFormat));
1802 }
1803
1804 /**
1805 * Marshals the in body the specified {@link DataFormat}
1806 * reference in the {@link org.apache.camel.spi.Registry} and sets
1807 * the output on the out message body.
1808 *
1809 * @return this object
1810 */
1811 public Type marshal(String dataTypeRef) {
1812 addOutput(new MarshalType(dataTypeRef));
1813 return (Type) this;
1814 }
1815
1816 // Properties
1817 // -------------------------------------------------------------------------
1818 @XmlTransient
1819 public ProcessorType<? extends ProcessorType> getParent() {
1820 return parent;
1821 }
1822
1823 public void setParent(ProcessorType<? extends ProcessorType> parent) {
1824 this.parent = parent;
1825 }
1826
1827 @XmlTransient
1828 public ErrorHandlerBuilder getErrorHandlerBuilder() {
1829 if (errorHandlerBuilder == null) {
1830 errorHandlerBuilder = createErrorHandlerBuilder();
1831 }
1832 return errorHandlerBuilder;
1833 }
1834
1835 /**
1836 * Sets the error handler to use with processors created by this builder
1837 */
1838 public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
1839 this.errorHandlerBuilder = errorHandlerBuilder;
1840 }
1841
1842 /**
1843 * Sets the error handler if one is not already set
1844 */
1845 protected void setErrorHandlerBuilderIfNull(ErrorHandlerBuilder errorHandlerBuilder) {
1846 if (this.errorHandlerBuilder == null) {
1847 setErrorHandlerBuilder(errorHandlerBuilder);
1848 }
1849 }
1850
1851 public String getErrorHandlerRef() {
1852 return errorHandlerRef;
1853 }
1854
1855 /**
1856 * Sets the bean ref name of the error handler builder to use on this route
1857 */
1858 @XmlAttribute(required = false)
1859 public void setErrorHandlerRef(String errorHandlerRef) {
1860 this.errorHandlerRef = errorHandlerRef;
1861 setErrorHandlerBuilder(new ErrorHandlerBuilderRef(errorHandlerRef));
1862 }
1863
1864 @XmlTransient
1865 public boolean isInheritErrorHandler() {
1866 return isInheritErrorHandler(getInheritErrorHandlerFlag());
1867 }
1868
1869 /**
1870 * Lets default the inherit value to be true if there is none specified
1871 */
1872 public static boolean isInheritErrorHandler(Boolean value) {
1873 return value == null || value.booleanValue();
1874 }
1875
1876 @XmlAttribute(name = "inheritErrorHandler", required = false)
1877 public Boolean getInheritErrorHandlerFlag() {
1878 return inheritErrorHandlerFlag;
1879 }
1880
1881 public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) {
1882 this.inheritErrorHandlerFlag = inheritErrorHandlerFlag;
1883 }
1884
1885 @XmlTransient
1886 public NodeFactory getNodeFactory() {
1887 if (nodeFactory == null) {
1888 nodeFactory = new NodeFactory();
1889 }
1890 return nodeFactory;
1891 }
1892
1893 public void setNodeFactory(NodeFactory nodeFactory) {
1894 this.nodeFactory = nodeFactory;
1895 }
1896
1897 /**
1898 * Returns a label to describe this node such as the expression if some kind of expression node
1899 */
1900 public String getLabel() {
1901 return "";
1902 }
1903
1904 // Implementation methods
1905 // -------------------------------------------------------------------------
1906
1907 /**
1908 * Creates the processor and wraps it in any necessary interceptors and
1909 * error handlers
1910 */
1911 protected Processor makeProcessor(RouteContext routeContext) throws Exception {
1912 Processor processor = createProcessor(routeContext);
1913 return wrapProcessor(routeContext, processor);
1914 }
1915
1916 /**
1917 * A strategy method which allows derived classes to wrap the child
1918 * processor in some kind of interceptor
1919 *
1920 * @param routeContext
1921 * @param target the processor which can be wrapped
1922 * @return the original processor or a new wrapped interceptor
1923 */
1924 protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
1925 // The target is required.
1926 if (target == null) {
1927 throw new IllegalArgumentException("target not provided on node: " + this);
1928 }
1929
1930 List<InterceptStrategy> strategies = new ArrayList<InterceptStrategy>();
1931 CamelContext camelContext = routeContext.getCamelContext();
1932 if (camelContext instanceof DefaultCamelContext) {
1933 DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
1934 strategies.addAll(defaultCamelContext.getInterceptStrategies());
1935 }
1936 strategies.addAll(routeContext.getInterceptStrategies());
1937 for (InterceptStrategy strategy : strategies) {
1938 if (strategy != null) {
1939 target = strategy.wrapProcessorInInterceptors(this, target);
1940 }
1941 }
1942
1943 List<InterceptorType> list = routeContext.getRoute().getInterceptors();
1944 if (interceptors != null) {
1945 list.addAll(interceptors);
1946 }
1947 // lets reverse the list so we apply the inner interceptors first
1948 Collections.reverse(list);
1949 Set<Processor> interceptors = new HashSet<Processor>();
1950 interceptors.add(target);
1951 for (InterceptorType interceptorType : list) {
1952 DelegateProcessor interceptor = interceptorType.createInterceptor(routeContext);
1953 if (!interceptors.contains(interceptor)) {
1954 interceptors.add(interceptor);
1955 if (interceptor.getProcessor() != null) {
1956 LOG.warn("Interceptor " + interceptor + " currently wraps target "
1957 + interceptor.getProcessor()
1958 + " is attempting to change target " + target
1959 + " new wrapping has been denied.");
1960 } else {
1961 interceptor.setProcessor(target);
1962 target = interceptor;
1963 }
1964 }
1965 }
1966 return target;
1967 }
1968
1969 /**
1970 * A strategy method to allow newly created processors to be wrapped in an
1971 * error handler.
1972 */
1973 protected Processor wrapInErrorHandler(RouteContext routeContext, Processor target) throws Exception {
1974 // The target is required.
1975 if (target == null) {
1976 throw new IllegalArgumentException("target not provided on node: " + this);
1977 }
1978
1979 ErrorHandlerWrappingStrategy strategy = routeContext.getErrorHandlerWrappingStrategy();
1980
1981 if (strategy != null) {
1982 return strategy.wrapProcessorInErrorHandler(routeContext, this, target);
1983 }
1984
1985 return getErrorHandlerBuilder().createErrorHandler(routeContext, target);
1986 }
1987
1988 protected ErrorHandlerBuilder createErrorHandlerBuilder() {
1989 if (errorHandlerRef != null) {
1990 return new ErrorHandlerBuilderRef(errorHandlerRef);
1991 }
1992 if (isInheritErrorHandler()) {
1993 return new DeadLetterChannelBuilder();
1994 } else {
1995 return new NoErrorHandlerBuilder();
1996 }
1997 }
1998
1999 protected void configureChild(ProcessorType output) {
2000 output.setNodeFactory(getNodeFactory());
2001 }
2002
2003 public void addOutput(ProcessorType processorType) {
2004 processorType.setParent(this);
2005 configureChild(processorType);
2006 if (blocks.isEmpty()) {
2007 getOutputs().add(processorType);
2008 } else {
2009 Block block = blocks.getLast();
2010 block.addOutput(processorType);
2011 }
2012 }
2013
2014 /**
2015 * Creates a new instance of some kind of composite processor which defaults
2016 * to using a {@link Pipeline} but derived classes could change the
2017 * behaviour
2018 */
2019 protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) {
2020 // return new MulticastProcessor(list);
2021 return new Pipeline(list);
2022 }
2023
2024 protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType<?>> outputs)
2025 throws Exception {
2026 List<Processor> list = new ArrayList<Processor>();
2027 for (ProcessorType output : outputs) {
2028 Processor processor = output.createProcessor(routeContext);
2029 // if the ProceedType create processor is null we keep on going
2030 if (output instanceof ProceedType && processor == null) {
2031 continue;
2032 }
2033 processor = output.wrapProcessorInInterceptors(routeContext, processor);
2034
2035 ProcessorType currentProcessor = this;
2036 if (!(currentProcessor instanceof ExceptionType || currentProcessor instanceof TryType)) {
2037 processor = output.wrapInErrorHandler(routeContext, processor);
2038 }
2039
2040 list.add(processor);
2041 }
2042 Processor processor = null;
2043 if (!list.isEmpty()) {
2044 if (list.size() == 1) {
2045 processor = list.get(0);
2046 } else {
2047 processor = createCompositeProcessor(routeContext, list);
2048 }
2049 }
2050 return processor;
2051 }
2052
2053 public void clearOutput() {
2054 getOutputs().clear();
2055 blocks.clear();
2056 }
2057 }