001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.ArrayList;
020 import java.util.Arrays;
021 import java.util.Collection;
022 import java.util.Collections;
023 import java.util.HashSet;
024 import java.util.LinkedList;
025 import java.util.List;
026 import java.util.Set;
027 import java.util.concurrent.ThreadPoolExecutor;
028
029 import javax.xml.bind.annotation.XmlAccessType;
030 import javax.xml.bind.annotation.XmlAccessorType;
031 import javax.xml.bind.annotation.XmlAttribute;
032 import javax.xml.bind.annotation.XmlTransient;
033
034 import org.apache.camel.CamelContext;
035 import org.apache.camel.CamelException;
036 import org.apache.camel.Endpoint;
037 import org.apache.camel.Exchange;
038 import org.apache.camel.Expression;
039 import org.apache.camel.Predicate;
040 import org.apache.camel.Processor;
041 import org.apache.camel.Route;
042 import org.apache.camel.RuntimeCamelException;
043 import org.apache.camel.builder.DataFormatClause;
044 import org.apache.camel.builder.DeadLetterChannelBuilder;
045 import org.apache.camel.builder.ErrorHandlerBuilder;
046 import org.apache.camel.builder.ErrorHandlerBuilderRef;
047 import org.apache.camel.builder.ExpressionClause;
048 import org.apache.camel.builder.NoErrorHandlerBuilder;
049 import org.apache.camel.builder.ProcessorBuilder;
050 import org.apache.camel.impl.DefaultCamelContext;
051 import org.apache.camel.model.dataformat.DataFormatType;
052 import org.apache.camel.model.language.ExpressionType;
053 import org.apache.camel.model.language.LanguageExpression;
054 import org.apache.camel.processor.ConvertBodyProcessor;
055 import org.apache.camel.processor.DelegateProcessor;
056 import org.apache.camel.processor.Pipeline;
057 import org.apache.camel.processor.aggregate.AggregationCollection;
058 import org.apache.camel.processor.aggregate.AggregationStrategy;
059 import org.apache.camel.processor.idempotent.MessageIdRepository;
060 import org.apache.camel.spi.DataFormat;
061 import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
062 import org.apache.camel.spi.InterceptStrategy;
063 import org.apache.camel.spi.Policy;
064 import org.apache.camel.spi.RouteContext;
065 import org.apache.commons.logging.Log;
066 import org.apache.commons.logging.LogFactory;
067
068 /**
069 * Base class for processor types that most XML types extend.
070 *
071 * @version $Revision: 673837 $
072 */
073 @XmlAccessorType(XmlAccessType.PROPERTY)
074 public abstract class ProcessorType<Type extends ProcessorType> extends OptionalIdentifiedType<Type> implements Block {
075 public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE";
076 private static final transient Log LOG = LogFactory.getLog(ProcessorType.class);
077 private ErrorHandlerBuilder errorHandlerBuilder;
078 private Boolean inheritErrorHandlerFlag;
079 private NodeFactory nodeFactory;
080 private LinkedList<Block> blocks = new LinkedList<Block>();
081 private ProcessorType<? extends ProcessorType> parent;
082 private List<InterceptorType> interceptors = new ArrayList<InterceptorType>();
083 private String errorHandlerRef;
084
085 // else to use an optional attribute in JAXB2
086 public abstract List<ProcessorType<?>> getOutputs();
087
088
089 public Processor createProcessor(RouteContext routeContext) throws Exception {
090 throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
091 }
092
093 public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
094 Collection<ProcessorType<?>> outputs = getOutputs();
095 return createOutputsProcessor(routeContext, outputs);
096 }
097
098 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
099 Processor processor = makeProcessor(routeContext);
100 if (!routeContext.isRouteAdded()) {
101 routeContext.addEventDrivenProcessor(processor);
102 }
103 }
104
105 /**
106 * Wraps the child processor in whatever necessary interceptors and error
107 * handlers
108 */
109 public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
110 processor = wrapProcessorInInterceptors(routeContext, processor);
111 return wrapInErrorHandler(routeContext, processor);
112 }
113
114 // Fluent API
115 // -------------------------------------------------------------------------
116
117 /**
118 * Sends the exchange to the given endpoint URI
119 */
120 public Type to(String uri) {
121 addOutput(new ToType(uri));
122 return (Type) this;
123 }
124
125 /**
126 * Sends the exchange to the given endpoint
127 */
128 public Type to(Endpoint endpoint) {
129 addOutput(new ToType(endpoint));
130 return (Type) this;
131 }
132
133 /**
134 * Sends the exchange to a list of endpoints using the
135 * {@link MulticastProcessor} pattern
136 */
137 public Type to(String... uris) {
138 for (String uri : uris) {
139 addOutput(new ToType(uri));
140 }
141 return (Type) this;
142 }
143
144 /**
145 * Sends the exchange to a list of endpoints using the
146 * {@link MulticastProcessor} pattern
147 */
148 public Type to(Endpoint... endpoints) {
149 for (Endpoint endpoint : endpoints) {
150 addOutput(new ToType(endpoint));
151 }
152 return (Type) this;
153 }
154
155 /**
156 * Sends the exchange to a list of endpoint using the
157 * {@link MulticastProcessor} pattern
158 */
159 public Type to(Collection<Endpoint> endpoints) {
160 for (Endpoint endpoint : endpoints) {
161 addOutput(new ToType(endpoint));
162 }
163 return (Type) this;
164 }
165
166 /**
167 * Multicasts messages to all its child outputs; so that each processor and
168 * destination gets a copy of the original message to avoid the processors
169 * interfering with each other.
170 */
171 public MulticastType multicast() {
172 MulticastType answer = new MulticastType();
173 addOutput(answer);
174 return answer;
175 }
176
177 /**
178 * Multicasts messages to all its child outputs; so that each processor and
179 * destination gets a copy of the original message to avoid the processors
180 * interfering with each other.
181 * @param aggregationStrategy the strategy used to aggregate responses for
182 * every part
183 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
184 * @return the multicast type
185 */
186 public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
187 MulticastType answer = new MulticastType();
188 addOutput(answer);
189 answer.setAggregationStrategy(aggregationStrategy);
190 answer.setParallelProcessing(parallelProcessing);
191 return answer;
192 }
193
194 /**
195 * Multicasts messages to all its child outputs; so that each processor and
196 * destination gets a copy of the original message to avoid the processors
197 * interfering with each other.
198 * @param aggregationStrategy the strategy used to aggregate responses for
199 * every part
200 * @return the multicast type
201 */
202 public MulticastType multicast(AggregationStrategy aggregationStrategy) {
203 MulticastType answer = new MulticastType();
204 addOutput(answer);
205 answer.setAggregationStrategy(aggregationStrategy);
206 return answer;
207 }
208
209 /**
210 * Creates a {@link Pipeline} of the list of endpoints so that the message
211 * will get processed by each endpoint in turn and for request/response the
212 * output of one endpoint will be the input of the next endpoint
213 */
214 public Type pipeline(String... uris) {
215 // TODO pipeline v mulicast
216 return to(uris);
217 }
218
219 /**
220 * Creates a {@link Pipeline} of the list of endpoints so that the message
221 * will get processed by each endpoint in turn and for request/response the
222 * output of one endpoint will be the input of the next endpoint
223 */
224 public Type pipeline(Endpoint... endpoints) {
225 // TODO pipeline v mulicast
226 return to(endpoints);
227 }
228
229 /**
230 * Creates a {@link Pipeline} of the list of endpoints so that the message
231 * will get processed by each endpoint in turn and for request/response the
232 * output of one endpoint will be the input of the next endpoint
233 */
234 public Type pipeline(Collection<Endpoint> endpoints) {
235 // TODO pipeline v mulicast
236 return to(endpoints);
237 }
238
239 /**
240 * Ends the current block
241 */
242 public ProcessorType<? extends ProcessorType> end() {
243 if (blocks.isEmpty()) {
244 if (parent == null) {
245 throw new IllegalArgumentException("Root node with no active block");
246 }
247 return parent;
248 }
249 popBlock();
250 return this;
251 }
252
253 /**
254 * Causes subsequent processors to be called asynchronously
255 *
256 * @param coreSize the number of threads that will be used to process
257 * messages in subsequent processors.
258 * @return a ThreadType builder that can be used to further configure the
259 * the thread pool.
260 */
261 public ThreadType thread(int coreSize) {
262 ThreadType answer = new ThreadType(coreSize);
263 addOutput(answer);
264 return answer;
265 }
266
267 /**
268 * Causes subsequent processors to be called asynchronously
269 *
270 * @param executor the executor that will be used to process
271 * messages in subsequent processors.
272 * @return a ThreadType builder that can be used to further configure the
273 * the thread pool.
274 */
275 public ProcessorType<Type> thread(ThreadPoolExecutor executor) {
276 ThreadType answer = new ThreadType(executor);
277 addOutput(answer);
278 return this;
279 }
280
281 /**
282 * Creates an {@link IdempotentConsumer} to avoid duplicate messages
283 */
284 public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression,
285 MessageIdRepository messageIdRepository) {
286 IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository);
287 addOutput(answer);
288 return answer;
289 }
290
291 /**
292 * Creates an {@link IdempotentConsumer} to avoid duplicate messages
293 *
294 * @return the builder used to create the expression
295 */
296 public ExpressionClause<IdempotentConsumerType> idempotentConsumer(MessageIdRepository messageIdRepository) {
297 IdempotentConsumerType answer = new IdempotentConsumerType();
298 answer.setMessageIdRepository(messageIdRepository);
299 addOutput(answer);
300 return ExpressionClause.createAndSetExpression(answer);
301 }
302
303 /**
304 * Creates a predicate expression which only if it is true then the
305 * exchange is forwarded to the destination
306 *
307 * @return the clause used to create the filter expression
308 */
309 public ExpressionClause<FilterType> filter() {
310 FilterType filter = new FilterType();
311 addOutput(filter);
312 return ExpressionClause.createAndSetExpression(filter);
313 }
314
315 /**
316 * Creates a predicate which is applied and only if it is true then the
317 * exchange is forwarded to the destination
318 *
319 * @return the builder for a predicate
320 */
321 public FilterType filter(Predicate predicate) {
322 FilterType filter = new FilterType(predicate);
323 addOutput(filter);
324 return filter;
325 }
326
327 public FilterType filter(ExpressionType expression) {
328 FilterType filter = getNodeFactory().createFilter();
329 filter.setExpression(expression);
330 addOutput(filter);
331 return filter;
332 }
333
334 public FilterType filter(String language, String expression) {
335 return filter(new LanguageExpression(language, expression));
336 }
337
338 public LoadBalanceType loadBalance() {
339 LoadBalanceType answer = new LoadBalanceType();
340 addOutput(answer);
341 return answer;
342 }
343
344
345 /**
346 * Creates a choice of one or more predicates with an otherwise clause
347 *
348 * @return the builder for a choice expression
349 */
350 public ChoiceType choice() {
351 ChoiceType answer = new ChoiceType();
352 addOutput(answer);
353 return answer;
354 }
355
356 /**
357 * Creates a try/catch block
358 *
359 * @return the builder for a tryBlock expression
360 */
361 public TryType tryBlock() {
362 TryType answer = new TryType();
363 addOutput(answer);
364 return answer;
365 }
366
367 /**
368 * Creates a dynamic <a
369 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
370 * List</a> pattern.
371 *
372 * @param receipients is the builder of the expression used in the
373 * {@link RecipientList} to decide the destinations
374 */
375 public Type recipientList(Expression receipients) {
376 RecipientListType answer = new RecipientListType(receipients);
377 addOutput(answer);
378 return (Type) this;
379 }
380
381 /**
382 * Creates a dynamic <a
383 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient
384 * List</a> pattern.
385 *
386 * @return the expression clause for the expression used in the
387 * {@link RecipientList} to decide the destinations
388 */
389 public ExpressionClause<ProcessorType<Type>> recipientList() {
390 RecipientListType answer = new RecipientListType();
391 addOutput(answer);
392 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
393 answer.setExpression(clause);
394 return clause;
395 }
396
397 /**
398 * Creates a <a
399 * href="http://activemq.apache.org/camel/routing-slip.html">Routing
400 * Slip</a> pattern.
401 *
402 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
403 * class will look in for the list of URIs to route the message to.
404 * @param uriDelimiter is the delimiter that will be used to split up
405 * the list of URIs in the routing slip.
406 */
407 public Type routingSlip(String header, String uriDelimiter) {
408 RoutingSlipType answer = new RoutingSlipType(header, uriDelimiter);
409 addOutput(answer);
410 return (Type) this;
411 }
412
413 /**
414 * Creates a <a
415 * href="http://activemq.apache.org/camel/routing-slip.html">Routing
416 * Slip</a> pattern.
417 *
418 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
419 * class will look in for the list of URIs to route the message to. The list of URIs
420 * will be split based on the default delimiter
421 * {@link RoutingSlipType#DEFAULT_DELIMITER}.
422 */
423 public Type routingSlip(String header) {
424 RoutingSlipType answer = new RoutingSlipType(header);
425 addOutput(answer);
426 return (Type) this;
427 }
428
429 /**
430 * Creates a <a
431 * href="http://activemq.apache.org/camel/routing-slip.html">Routing
432 * Slip</a> pattern with the default header {@link RoutingSlipType#ROUTING_SLIP_HEADER}.
433 * The list of URIs in the header will be split based on the default delimiter
434 * {@link RoutingSlipType#DEFAULT_DELIMITER}.
435 */
436 public Type routingSlip() {
437 RoutingSlipType answer = new RoutingSlipType();
438 addOutput(answer);
439 return (Type) this;
440 }
441
442 /**
443 * Creates the <a
444 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
445 * pattern where an expression is evaluated to iterate through each of the
446 * parts of a message and then each part is then send to some endpoint.
447 * This splitter responds with the latest message returned from destination
448 * endpoint.
449 *
450 * @param receipients the expression on which to split
451 * @return the builder
452 */
453 public SplitterType splitter(Expression receipients) {
454 SplitterType answer = new SplitterType(receipients);
455 addOutput(answer);
456 return answer;
457 }
458
459 /**
460 * Creates the <a
461 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
462 * pattern where an expression is evaluated to iterate through each of the
463 * parts of a message and then each part is then send to some endpoint.
464 * This splitter responds with the latest message returned from destination
465 * endpoint.
466 *
467 * @return the expression clause for the expression on which to split
468 */
469 public ExpressionClause<SplitterType> splitter() {
470 SplitterType answer = new SplitterType();
471 addOutput(answer);
472 return ExpressionClause.createAndSetExpression(answer);
473 }
474
475 /**
476 * Creates the <a
477 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
478 * pattern where an expression is evaluated to iterate through each of the
479 * parts of a message and then each part is then send to some endpoint.
480 * Answer from the splitter is produced using given {@link AggregationStrategy}
481 * @param partsExpression the expression on which to split
482 * @param aggregationStrategy the strategy used to aggregate responses for
483 * every part
484 * @return the builder
485 */
486 public SplitterType splitter(Expression partsExpression, AggregationStrategy aggregationStrategy) {
487 SplitterType answer = new SplitterType(partsExpression);
488 addOutput(answer);
489 answer.setAggregationStrategy(aggregationStrategy);
490 return answer;
491 }
492
493 /**
494 * Creates the <a
495 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
496 * pattern where an expression is evaluated to iterate through each of the
497 * parts of a message and then each part is then send to some endpoint.
498 * Answer from the splitter is produced using given {@link AggregationStrategy}
499 * @param aggregationStrategy the strategy used to aggregate responses for
500 * every part
501 * @return the expression clause for the expression on which to split
502 */
503 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy) {
504 SplitterType answer = new SplitterType();
505 addOutput(answer);
506 answer.setAggregationStrategy(aggregationStrategy);
507 return ExpressionClause.createAndSetExpression(answer);
508 }
509
510 /**
511 * Creates the <a
512 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
513 * pattern where an expression is evaluated to iterate through each of the
514 * parts of a message and then each part is then send to some endpoint.
515 * This splitter responds with the latest message returned from destination
516 * endpoint.
517 *
518 * @param receipients the expression on which to split
519 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
520 * @return the builder
521 */
522 public SplitterType splitter(Expression receipients, boolean parallelProcessing) {
523 SplitterType answer = new SplitterType(receipients);
524 addOutput(answer);
525 answer.setParallelProcessing(parallelProcessing);
526 return answer;
527 }
528
529 /**
530 * Creates the <a
531 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
532 * pattern where an expression is evaluated to iterate through each of the
533 * parts of a message and then each part is then send to some endpoint.
534 * This splitter responds with the latest message returned from destination
535 * endpoint.
536 *
537 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
538 * @return the expression clause for the expression on which to split
539 */
540 public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) {
541 SplitterType answer = new SplitterType();
542 addOutput(answer);
543 answer.setParallelProcessing(parallelProcessing);
544 return ExpressionClause.createAndSetExpression(answer);
545 }
546
547 /**
548 * Creates the <a
549 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
550 * pattern where an expression is evaluated to iterate through each of the
551 * parts of a message and then each part is then send to some endpoint.
552 * Answer from the splitter is produced using given {@link AggregationStrategy}
553 * @param partsExpression the expression on which to split
554 * @param aggregationStrategy the strategy used to aggregate responses for
555 * every part
556 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
557 * @return the builder
558 */
559 public SplitterType splitter(Expression partsExpression,
560 AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
561 SplitterType answer = new SplitterType(partsExpression);
562 addOutput(answer);
563 answer.setAggregationStrategy(aggregationStrategy);
564 answer.setParallelProcessing(parallelProcessing);
565 return answer;
566 }
567
568 /**
569 * Creates the <a
570 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
571 * pattern where an expression is evaluated to iterate through each of the
572 * parts of a message and then each part is then send to some endpoint.
573 * Answer from the splitter is produced using given {@link AggregationStrategy}
574 * @param aggregationStrategy the strategy used to aggregate responses for
575 * every part
576 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
577 * @return the expression clause for the expression on which to split
578 */
579 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
580 SplitterType answer = new SplitterType();
581 addOutput(answer);
582 answer.setAggregationStrategy(aggregationStrategy);
583 answer.setParallelProcessing(parallelProcessing);
584 return ExpressionClause.createAndSetExpression(answer);
585 }
586
587
588 /**
589 * Creates the <a
590 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
591 * pattern where a list of expressions are evaluated to be able to compare
592 * the message exchanges to reorder them. e.g. you may wish to sort by some
593 * headers
594 *
595 * @return the expression clause for the expressions on which to compare messages in order
596 */
597 public ExpressionClause<ResequencerType> resequencer() {
598 ResequencerType answer = new ResequencerType();
599 addOutput(answer);
600 ExpressionClause<ResequencerType> clause = new ExpressionClause<ResequencerType>(answer);
601 answer.expression(clause);
602 return clause;
603 }
604
605 /**
606 * Creates the <a
607 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
608 * pattern where an expression is evaluated to be able to compare the
609 * message exchanges to reorder them. e.g. you may wish to sort by some
610 * header
611 *
612 * @param expression the expression on which to compare messages in order
613 * @return the builder
614 */
615 public ResequencerType resequencer(Expression<Exchange> expression) {
616 return resequencer(Collections.<Expression>singletonList(expression));
617 }
618
619 /**
620 * Creates the <a
621 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
622 * pattern where a list of expressions are evaluated to be able to compare
623 * the message exchanges to reorder them. e.g. you may wish to sort by some
624 * headers
625 *
626 * @param expressions the expressions on which to compare messages in order
627 * @return the builder
628 */
629 public ResequencerType resequencer(List<Expression> expressions) {
630 ResequencerType answer = new ResequencerType(expressions);
631 addOutput(answer);
632 return answer;
633 }
634
635 /**
636 * Creates the <a
637 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
638 * pattern where a list of expressions are evaluated to be able to compare
639 * the message exchanges to reorder them. e.g. you may wish to sort by some
640 * headers
641 *
642 * @param expressions the expressions on which to compare messages in order
643 * @return the builder
644 */
645 public ResequencerType resequencer(Expression... expressions) {
646 List<Expression> list = new ArrayList<Expression>();
647 list.addAll(Arrays.asList(expressions));
648 return resequencer(list);
649 }
650
651 /**
652 * Creates an <a
653 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
654 * pattern where a batch of messages are processed (up to a maximum amount
655 * or until some timeout is reached) and messages for the same correlation
656 * key are combined together using some kind of {@link AggregationStrategy}
657 * (by default the latest message is used) to compress many message exchanges
658 * into a smaller number of exchanges.
659 * <p/>
660 * A good example of this is stock market data; you may be receiving 30,000
661 * messages/second and you may want to throttle it right down so that multiple
662 * messages for the same stock are combined (or just the latest message is used
663 * and older prices are discarded). Another idea is to combine line item messages
664 * together into a single invoice message.
665 */
666 public ExpressionClause<AggregatorType> aggregator() {
667 AggregatorType answer = new AggregatorType();
668 addOutput(answer);
669 return ExpressionClause.createAndSetExpression(answer);
670 }
671
672 /**
673 * Creates an <a
674 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
675 * pattern where a batch of messages are processed (up to a maximum amount
676 * or until some timeout is reached) and messages for the same correlation
677 * key are combined together using some kind of {@link AggregationStrategy}
678 * (by default the latest message is used) to compress many message exchanges
679 * into a smaller number of exchanges.
680 * <p/>
681 * A good example of this is stock market data; you may be receiving 30,000
682 * messages/second and you may want to throttle it right down so that multiple
683 * messages for the same stock are combined (or just the latest message is used
684 * and older prices are discarded). Another idea is to combine line item messages
685 * together into a single invoice message.
686 *
687 * @param aggregationStrategy the strategy used for the aggregation
688 */
689 public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy) {
690 AggregatorType answer = new AggregatorType();
691 answer.setAggregationStrategy(aggregationStrategy);
692 addOutput(answer);
693 return ExpressionClause.createAndSetExpression(answer);
694 }
695
696 /**
697 * Creates an <a
698 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
699 * pattern using a custom aggregation collection implementation.
700 *
701 * @param aggregationCollection the collection used to perform the aggregation
702 */
703 public ExpressionClause<AggregatorType> aggregator(AggregationCollection aggregationCollection) {
704 AggregatorType answer = new AggregatorType();
705 answer.setAggregationCollection(aggregationCollection);
706 addOutput(answer);
707 return ExpressionClause.createAndSetExpression(answer);
708 }
709
710 /**
711 * Creates an <a
712 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
713 * pattern where a batch of messages are processed (up to a maximum amount
714 * or until some timeout is reached) and messages for the same correlation
715 * key are combined together using some kind of {@link AggregationStrategy}
716 * (by default the latest message is used) to compress many message exchanges
717 * into a smaller number of exchanges.
718 * <p/>
719 * A good example of this is stock market data; you may be receiving 30,000
720 * messages/second and you may want to throttle it right down so that multiple
721 * messages for the same stock are combined (or just the latest message is used
722 * and older prices are discarded). Another idea is to combine line item messages
723 * together into a single invoice message.
724 *
725 * @param correlationExpression the expression used to calculate the
726 * correlation key. For a JMS message this could be the
727 * expression <code>header("JMSDestination")</code> or
728 * <code>header("JMSCorrelationID")</code>
729 */
730 public AggregatorType aggregator(Expression correlationExpression) {
731 AggregatorType answer = new AggregatorType(correlationExpression);
732 addOutput(answer);
733 return answer;
734 }
735
736 /**
737 * Creates an <a
738 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
739 * pattern where a batch of messages are processed (up to a maximum amount
740 * or until some timeout is reached) and messages for the same correlation
741 * key are combined together using some kind of {@link AggregationStrategy}
742 * (by default the latest message is used) to compress many message exchanges
743 * into a smaller number of exchanges.
744 * <p/>
745 * A good example of this is stock market data; you may be receiving 30,000
746 * messages/second and you may want to throttle it right down so that multiple
747 * messages for the same stock are combined (or just the latest message is used
748 * and older prices are discarded). Another idea is to combine line item messages
749 * together into a single invoice message.
750 *
751 * @param correlationExpression the expression used to calculate the
752 * correlation key. For a JMS message this could be the
753 * expression <code>header("JMSDestination")</code> or
754 * <code>header("JMSCorrelationID")</code>
755 */
756 public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
757 AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy);
758 addOutput(answer);
759 return answer;
760 }
761
762 /**
763 * Creates the <a
764 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
765 * where an expression is used to calculate the time which the message will
766 * be dispatched on
767 *
768 * @param processAtExpression an expression to calculate the time at which
769 * the messages should be processed
770 * @return the builder
771 */
772 public DelayerType delayer(Expression<Exchange> processAtExpression) {
773 return delayer(processAtExpression, 0L);
774 }
775
776 /**
777 * Creates the <a
778 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
779 * where an expression is used to calculate the time which the message will
780 * be dispatched on
781 *
782 * @param processAtExpression an expression to calculate the time at which
783 * the messages should be processed
784 * @param delay the delay in milliseconds which is added to the
785 * processAtExpression to determine the time the message
786 * should be processed
787 * @return the builder
788 */
789 public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) {
790 DelayerType answer = new DelayerType(processAtExpression, delay);
791 addOutput(answer);
792 return answer;
793 }
794
795 /**
796 * Creates the <a
797 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
798 * where an expression is used to calculate the time which the message will
799 * be dispatched on
800 * @return the expression clause to create the expression
801 */
802 public ExpressionClause<DelayerType> delayer() {
803 DelayerType answer = new DelayerType();
804 addOutput(answer);
805 return ExpressionClause.createAndSetExpression(answer);
806 }
807
808 /**
809 * Creates the <a
810 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
811 * where a fixed amount of milliseconds are used to delay processing of a
812 * message exchange
813 *
814 * @param delay the default delay in milliseconds
815 * @return the builder
816 */
817 public DelayerType delayer(long delay) {
818 return delayer(null, delay);
819 }
820
821 /**
822 * Creates the <a
823 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
824 * where an expression is used to calculate the time which the message will
825 * be dispatched on
826 *
827 * @return the builder
828 */
829 public ThrottlerType throttler(long maximumRequestCount) {
830 ThrottlerType answer = new ThrottlerType(maximumRequestCount);
831 addOutput(answer);
832 return answer;
833 }
834
835
836 public Type throwFault(Throwable fault) {
837 ThrowFaultType answer = new ThrowFaultType();
838 answer.setFault(fault);
839 addOutput(answer);
840 return (Type) this;
841 }
842
843 public Type throwFault(String message) {
844 return throwFault(new CamelException(message));
845 }
846
847 /**
848 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
849 */
850 public Type interceptor(String ref) {
851 InterceptorRef interceptor = new InterceptorRef(ref);
852 intercept(interceptor);
853 return (Type) this;
854 }
855
856 /**
857 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
858 */
859 public Type intercept(DelegateProcessor interceptor) {
860 intercept(new InterceptorRef(interceptor));
861 //lastInterceptor = interceptor;
862 return (Type) this;
863 }
864
865 /**
866 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
867 */
868 public InterceptType intercept() {
869 InterceptType answer = new InterceptType();
870 addOutput(answer);
871 return answer;
872 }
873
874 /**
875 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement)
876 */
877 public void intercept(InterceptorType interceptor) {
878 addOutput(interceptor);
879 pushBlock(interceptor);
880 }
881
882 /**
883 * Adds an interceptor around the whole of this nodes processing
884 *
885 * @param interceptor
886 */
887 public void addInterceptor(InterceptorType interceptor) {
888 interceptors.add(interceptor);
889 }
890
891 /**
892 * Adds an interceptor around the whole of this nodes processing
893 *
894 * @param interceptor
895 */
896 public void addInterceptor(DelegateProcessor interceptor) {
897 addInterceptor(new InterceptorRef(interceptor));
898 }
899
900 protected void pushBlock(Block block) {
901 blocks.add(block);
902 }
903
904 protected Block popBlock() {
905 return blocks.isEmpty() ? null : blocks.removeLast();
906 }
907
908 public Type proceed() {
909 ProceedType proceed = null;
910 ProcessorType currentProcessor = this;
911
912 if (currentProcessor instanceof InterceptType) {
913 proceed = ((InterceptType) currentProcessor).getProceed();
914 LOG.info("proceed() is the implied and hence not needed for an intercept()");
915 }
916 if (proceed == null) {
917 for (ProcessorType node = parent; node != null; node = node.getParent()) {
918 if (node instanceof InterceptType) {
919 InterceptType intercept = (InterceptType)node;
920 proceed = intercept.getProceed();
921 break;
922 }
923 }
924
925 if (proceed == null) {
926 throw new IllegalArgumentException("Cannot use proceed() without being within an intercept() block");
927 }
928
929 }
930
931 addOutput(proceed);
932 return (Type) this;
933 }
934
935 public Type stop() {
936 ProcessorType currentProcessor = this;
937
938 if (currentProcessor instanceof InterceptType) {
939 ((InterceptType) currentProcessor).stopIntercept();
940 } else {
941 ProcessorType node;
942 for (node = parent; node != null; node = node.getParent()) {
943 if (node instanceof InterceptType) {
944 ((InterceptType) node).stopIntercept();
945 break;
946 }
947 }
948 if (node == null) {
949 throw new IllegalArgumentException("Cannot use stop() without being within an intercept() block");
950 }
951 }
952
953 return (Type) this;
954 }
955
956 public ExceptionType exception(Class exceptionType) {
957 ExceptionType answer = new ExceptionType(exceptionType);
958 addOutput(answer);
959 return answer;
960 }
961
962 /**
963 * Apply an interceptor route if the predicate is true
964 */
965 public ChoiceType intercept(Predicate predicate) {
966 InterceptType answer = new InterceptType();
967 addOutput(answer);
968 return answer.when(predicate);
969 }
970
971 public Type interceptors(String... refs) {
972 for (String ref : refs) {
973 interceptor(ref);
974 }
975 return (Type) this;
976 }
977
978 /**
979 * Trace logs the exchange before it goes to the next processing step using
980 * the {@link #DEFAULT_TRACE_CATEGORY} logging category.
981 */
982 public Type trace() {
983 return trace(DEFAULT_TRACE_CATEGORY);
984 }
985
986 /**
987 * Trace logs the exchange before it goes to the next processing step using
988 * the specified logging category.
989 *
990 * @param category the logging category trace messages will sent to.
991 */
992 public Type trace(String category) {
993 final Log log = LogFactory.getLog(category);
994 return intercept(new DelegateProcessor() {
995 @Override
996 public void process(Exchange exchange) throws Exception {
997 log.trace(exchange);
998 processNext(exchange);
999 }
1000 });
1001 }
1002
1003 public PolicyRef policies() {
1004 PolicyRef answer = new PolicyRef();
1005 addOutput(answer);
1006 return answer;
1007 }
1008
1009 public PolicyRef policy(Policy policy) {
1010 PolicyRef answer = new PolicyRef(policy);
1011 addOutput(answer);
1012 return answer;
1013 }
1014
1015 /**
1016 * Forces handling of faults as exceptions
1017 *
1018 * @return the current builder with the fault handler configured
1019 */
1020 public Type handleFault() {
1021 intercept(new HandleFaultType());
1022 return (Type) this;
1023 }
1024
1025 /**
1026 * Installs the given error handler builder
1027 *
1028 * @param errorHandlerBuilder the error handler to be used by default for
1029 * all child routes
1030 * @return the current builder with the error handler configured
1031 */
1032 public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
1033 setErrorHandlerBuilder(errorHandlerBuilder);
1034 return (Type) this;
1035 }
1036
1037 /**
1038 * Configures whether or not the error handler is inherited by every
1039 * processing node (or just the top most one)
1040 *
1041 * @param condition the flag as to whether error handlers should be
1042 * inherited or not
1043 * @return the current builder
1044 */
1045 public Type inheritErrorHandler(boolean condition) {
1046 setInheritErrorHandlerFlag(condition);
1047 return (Type) this;
1048 }
1049
1050 // Transformers
1051 // -------------------------------------------------------------------------
1052
1053 /**
1054 * Adds the custom processor to this destination which could be a final
1055 * destination, or could be a transformation in a pipeline
1056 */
1057 public Type process(Processor processor) {
1058 ProcessorRef answer = new ProcessorRef(processor);
1059 addOutput(answer);
1060 return (Type) this;
1061 }
1062
1063 /**
1064 * Adds the custom processor reference to this destination which could be a final
1065 * destination, or could be a transformation in a pipeline
1066 */
1067 public Type processRef(String ref) {
1068 ProcessorRef answer = new ProcessorRef();
1069 answer.setRef(ref);
1070 addOutput(answer);
1071 return (Type) this;
1072 }
1073
1074 /**
1075 * Adds a bean which is invoked which could be a final destination, or could
1076 * be a transformation in a pipeline
1077 */
1078 public Type bean(Object bean) {
1079 BeanRef answer = new BeanRef();
1080 answer.setBean(bean);
1081 addOutput(answer);
1082 return (Type) this;
1083 }
1084
1085 /**
1086 * Adds a bean and method which is invoked which could be a final
1087 * destination, or could be a transformation in a pipeline
1088 */
1089 public Type bean(Object bean, String method) {
1090 BeanRef answer = new BeanRef();
1091 answer.setBean(bean);
1092 answer.setMethod(method);
1093 addOutput(answer);
1094 return (Type) this;
1095 }
1096
1097 /**
1098 * Adds a bean by type which is invoked which could be a final destination, or could
1099 * be a transformation in a pipeline
1100 */
1101 public Type bean(Class beanType) {
1102 BeanRef answer = new BeanRef();
1103 answer.setBeanType(beanType);
1104 addOutput(answer);
1105 return (Type) this;
1106 }
1107
1108 /**
1109 * Adds a bean type and method which is invoked which could be a final
1110 * destination, or could be a transformation in a pipeline
1111 */
1112 public Type bean(Class beanType, String method) {
1113 BeanRef answer = new BeanRef();
1114 answer.setBeanType(beanType);
1115 answer.setMethod(method);
1116 addOutput(answer);
1117 return (Type) this;
1118 }
1119
1120 /**
1121 * Adds a bean which is invoked which could be a final destination, or could
1122 * be a transformation in a pipeline
1123 */
1124 public Type beanRef(String ref) {
1125 BeanRef answer = new BeanRef(ref);
1126 addOutput(answer);
1127 return (Type) this;
1128 }
1129
1130 /**
1131 * Adds a bean and method which is invoked which could be a final
1132 * destination, or could be a transformation in a pipeline
1133 */
1134 public Type beanRef(String ref, String method) {
1135 BeanRef answer = new BeanRef(ref, method);
1136 addOutput(answer);
1137 return (Type) this;
1138 }
1139
1140 /**
1141 * Adds a processor which sets the body on the IN message
1142 */
1143 public ExpressionClause<ProcessorType<Type>> setBody() {
1144 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1145 SetBodyType answer = new SetBodyType(clause);
1146 addOutput(answer);
1147 return clause;
1148 }
1149
1150 /**
1151 * Adds a processor which sets the body on the IN message
1152 */
1153 public Type setBody(Expression expression) {
1154 SetBodyType answer = new SetBodyType(expression);
1155 addOutput(answer);
1156 return (Type) this;
1157 }
1158
1159 /**
1160 * Adds a processor which sets the body on the OUT message
1161 *
1162 * @deprecated Please use {@link #transform(Expression)} instead. Will be removed in Camel 2.0.
1163 */
1164 @Deprecated
1165 public Type setOutBody(Expression expression) {
1166 return transform(expression);
1167 }
1168
1169 /**
1170 * Adds a processor which sets the body on the OUT message
1171 *
1172 * @deprecated Please use {@link #transform()} instead. Will be removed in Camel 2.0.
1173 */
1174 @Deprecated
1175 public ExpressionClause<ProcessorType<Type>> setOutBody() {
1176 return transform();
1177 }
1178
1179 /**
1180 * Adds a processor which sets the body on the OUT message
1181 */
1182 public Type transform(Expression expression) {
1183 TransformType answer = new TransformType(expression);
1184 addOutput(answer);
1185 return (Type) this;
1186 }
1187
1188 /**
1189 * Adds a processor which sets the body on the OUT message
1190 */
1191 public ExpressionClause<ProcessorType<Type>> transform() {
1192 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1193 TransformType answer = new TransformType(clause);
1194 addOutput(answer);
1195 return clause;
1196 }
1197
1198 /**
1199 * Adds a processor which sets the body on the FAULT message
1200 */
1201 public Type setFaultBody(Expression expression) {
1202 return process(ProcessorBuilder.setFaultBody(expression));
1203 }
1204
1205 /**
1206 * Adds a processor which sets the header on the IN message
1207 */
1208 public ExpressionClause<ProcessorType<Type>> setHeader(String name) {
1209 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1210 SetHeaderType answer = new SetHeaderType(name, clause);
1211 addOutput(answer);
1212 return clause;
1213 }
1214
1215 /**
1216 * Adds a processor which sets the header on the IN message
1217 */
1218 public Type setHeader(String name, Expression expression) {
1219 SetHeaderType answer = new SetHeaderType(name, expression);
1220 addOutput(answer);
1221 return (Type) this;
1222 }
1223
1224 /**
1225 * Adds a processor which sets the header on the IN message to the given value
1226 */
1227 public Type setHeader(String name, String value) {
1228 SetHeaderType answer = new SetHeaderType(name, value);
1229 addOutput(answer);
1230 return (Type) this;
1231 }
1232
1233 /**
1234 * Adds a processor which sets the header on the OUT message
1235 */
1236 public ExpressionClause<ProcessorType<Type>> setOutHeader(String name) {
1237 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1238 process(ProcessorBuilder.setOutHeader(name, clause));
1239 return clause;
1240 }
1241
1242 /**
1243 * Adds a processor which sets the header on the OUT message
1244 */
1245 public Type setOutHeader(String name, Expression expression) {
1246 return process(ProcessorBuilder.setOutHeader(name, expression));
1247 }
1248
1249 /**
1250 * Adds a processor which sets the header on the OUT message
1251 */
1252 public Type setOutHeader(String name, String value) {
1253 return (Type) setOutHeader(name).constant(value);
1254 }
1255
1256 /**
1257 * Adds a processor which sets the header on the FAULT message
1258 */
1259 public Type setFaultHeader(String name, Expression expression) {
1260 return process(ProcessorBuilder.setFaultHeader(name, expression));
1261 }
1262
1263 /**
1264 * Adds a processor which sets the exchange property
1265 */
1266 public Type setProperty(String name, Expression expression) {
1267 return process(ProcessorBuilder.setProperty(name, expression));
1268 }
1269
1270
1271 /**
1272 * Adds a processor which sets the exchange property
1273 */
1274 public ExpressionClause<ProcessorType<Type>> setProperty(String name) {
1275 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this);
1276 process(ProcessorBuilder.setProperty(name, clause));
1277 return clause;
1278 }
1279
1280 /**
1281 * Adds a processor which removes the header on the IN message
1282 */
1283 public Type removeHeader(String name) {
1284 return process(ProcessorBuilder.removeHeader(name));
1285 }
1286
1287 /**
1288 * Adds a processor which removes the header on the OUT message
1289 */
1290 public Type removeOutHeader(String name) {
1291 return process(ProcessorBuilder.removeOutHeader(name));
1292 }
1293
1294 /**
1295 * Adds a processor which removes the header on the FAULT message
1296 */
1297 public Type removeFaultHeader(String name) {
1298 return process(ProcessorBuilder.removeFaultHeader(name));
1299 }
1300
1301 /**
1302 * Adds a processor which removes the exchange property
1303 */
1304 public Type removeProperty(String name) {
1305 return process(ProcessorBuilder.removeProperty(name));
1306 }
1307
1308 /**
1309 * Converts the IN message body to the specified type
1310 */
1311 public Type convertBodyTo(Class type) {
1312 addOutput(new ConvertBodyType(type));
1313 return (Type) this;
1314 }
1315
1316 /**
1317 * Converts the OUT message body to the specified type
1318 *
1319 * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0.
1320 */
1321 @Deprecated
1322 public Type convertOutBodyTo(Class type) {
1323 return process(new ConvertBodyProcessor(type));
1324 }
1325
1326 /**
1327 * Converts the FAULT message body to the specified type
1328 *
1329 * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0.
1330 */
1331 @Deprecated
1332 public Type convertFaultBodyTo(Class type) {
1333 return process(new ConvertBodyProcessor(type));
1334 }
1335
1336 // DataFormat support
1337 // -------------------------------------------------------------------------
1338
1339 /**
1340 * Unmarshals the in body using a {@link DataFormat} expression to define
1341 * the format of the input message and the output will be set on the out message body.
1342 *
1343 * @return the expression to create the {@link DataFormat}
1344 */
1345 public DataFormatClause<ProcessorType<Type>> unmarshal() {
1346 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Unmarshal);
1347 }
1348
1349 /**
1350 * Unmarshals the in body using the specified {@link DataFormat}
1351 * and sets the output on the out message body.
1352 *
1353 * @return this object
1354 */
1355 public Type unmarshal(DataFormatType dataFormatType) {
1356 addOutput(new UnmarshalType(dataFormatType));
1357 return (Type) this;
1358 }
1359
1360 /**
1361 * Unmarshals the in body using the specified {@link DataFormat}
1362 * and sets the output on the out message body.
1363 *
1364 * @return this object
1365 */
1366 public Type unmarshal(DataFormat dataFormat) {
1367 return unmarshal(new DataFormatType(dataFormat));
1368 }
1369
1370 /**
1371 * Unmarshals the in body using the specified {@link DataFormat}
1372 * reference in the {@link Registry} and sets the output on the out message body.
1373 *
1374 * @return this object
1375 */
1376 public Type unmarshal(String dataTypeRef) {
1377 addOutput(new UnmarshalType(dataTypeRef));
1378 return (Type) this;
1379 }
1380
1381 /**
1382 * Marshals the in body using a {@link DataFormat} expression to define
1383 * the format of the output which will be added to the out body.
1384 *
1385 * @return the expression to create the {@link DataFormat}
1386 */
1387 public DataFormatClause<ProcessorType<Type>> marshal() {
1388 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Marshal);
1389 }
1390
1391 /**
1392 * Marshals the in body using the specified {@link DataFormat}
1393 * and sets the output on the out message body.
1394 *
1395 * @return this object
1396 */
1397 public Type marshal(DataFormatType dataFormatType) {
1398 addOutput(new MarshalType(dataFormatType));
1399 return (Type) this;
1400 }
1401
1402 /**
1403 * Marshals the in body using the specified {@link DataFormat}
1404 * and sets the output on the out message body.
1405 *
1406 * @return this object
1407 */
1408 public Type marshal(DataFormat dataFormat) {
1409 return marshal(new DataFormatType(dataFormat));
1410 }
1411
1412 /**
1413 * Marshals the in body the specified {@link DataFormat}
1414 * reference in the {@link Registry} and sets the output on the out message body.
1415 *
1416 * @return this object
1417 */
1418 public Type marshal(String dataTypeRef) {
1419 addOutput(new MarshalType(dataTypeRef));
1420 return (Type) this;
1421 }
1422
1423 // Properties
1424 // -------------------------------------------------------------------------
1425 @XmlTransient
1426 public ProcessorType<? extends ProcessorType> getParent() {
1427 return parent;
1428 }
1429
1430 public void setParent(ProcessorType<? extends ProcessorType> parent) {
1431 this.parent = parent;
1432 }
1433
1434 @XmlTransient
1435 public ErrorHandlerBuilder getErrorHandlerBuilder() {
1436 if (errorHandlerBuilder == null) {
1437 errorHandlerBuilder = createErrorHandlerBuilder();
1438 }
1439 return errorHandlerBuilder;
1440 }
1441
1442 /**
1443 * Sets the error handler to use with processors created by this builder
1444 */
1445 public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
1446 this.errorHandlerBuilder = errorHandlerBuilder;
1447 }
1448
1449 /**
1450 * Sets the error handler if one is not already set
1451 */
1452 protected void setErrorHandlerBuilderIfNull(ErrorHandlerBuilder errorHandlerBuilder) {
1453 if (this.errorHandlerBuilder == null) {
1454 setErrorHandlerBuilder(errorHandlerBuilder);
1455 }
1456 }
1457
1458 public String getErrorHandlerRef() {
1459 return errorHandlerRef;
1460 }
1461
1462 /**
1463 * Sets the bean ref name of the error handler builder to use on this route
1464 */
1465 @XmlAttribute(required = false)
1466 public void setErrorHandlerRef(String errorHandlerRef) {
1467 this.errorHandlerRef = errorHandlerRef;
1468 setErrorHandlerBuilder(new ErrorHandlerBuilderRef(errorHandlerRef));
1469 }
1470
1471 @XmlTransient
1472 public boolean isInheritErrorHandler() {
1473 return isInheritErrorHandler(getInheritErrorHandlerFlag());
1474 }
1475
1476 /**
1477 * Lets default the inherit value to be true if there is none specified
1478 */
1479 public static boolean isInheritErrorHandler(Boolean value) {
1480 return value == null || value.booleanValue();
1481 }
1482
1483 @XmlAttribute(name = "inheritErrorHandler", required = false)
1484 public Boolean getInheritErrorHandlerFlag() {
1485 return inheritErrorHandlerFlag;
1486 }
1487
1488 public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) {
1489 this.inheritErrorHandlerFlag = inheritErrorHandlerFlag;
1490 }
1491
1492 @XmlTransient
1493 public NodeFactory getNodeFactory() {
1494 if (nodeFactory == null) {
1495 nodeFactory = new NodeFactory();
1496 }
1497 return nodeFactory;
1498 }
1499
1500 public void setNodeFactory(NodeFactory nodeFactory) {
1501 this.nodeFactory = nodeFactory;
1502 }
1503
1504 /**
1505 * Returns a label to describe this node such as the expression if some kind of expression node
1506 */
1507 public String getLabel() {
1508 return "";
1509 }
1510
1511 // Implementation methods
1512 // -------------------------------------------------------------------------
1513
1514 /**
1515 * Creates the processor and wraps it in any necessary interceptors and
1516 * error handlers
1517 */
1518 protected Processor makeProcessor(RouteContext routeContext) throws Exception {
1519 Processor processor = createProcessor(routeContext);
1520 return wrapProcessor(routeContext, processor);
1521 }
1522
1523 /**
1524 * A strategy method which allows derived classes to wrap the child
1525 * processor in some kind of interceptor
1526 *
1527 * @param routeContext
1528 * @param target the processor which can be wrapped
1529 * @return the original processor or a new wrapped interceptor
1530 */
1531 protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
1532 // The target is required.
1533 if (target == null) {
1534 throw new RuntimeCamelException("target not provided.");
1535 }
1536
1537 List<InterceptStrategy> strategies = new ArrayList<InterceptStrategy>();
1538 CamelContext camelContext = routeContext.getCamelContext();
1539 if (camelContext instanceof DefaultCamelContext) {
1540 DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
1541 strategies.addAll(defaultCamelContext.getInterceptStrategies());
1542 }
1543 strategies.addAll(routeContext.getInterceptStrategies());
1544 for (InterceptStrategy strategy : strategies) {
1545 if (strategy != null) {
1546 target = strategy.wrapProcessorInInterceptors(this, target);
1547 }
1548 }
1549
1550 List<InterceptorType> list = routeContext.getRoute().getInterceptors();
1551 if (interceptors != null) {
1552 list.addAll(interceptors);
1553 }
1554 // lets reverse the list so we apply the inner interceptors first
1555 Collections.reverse(list);
1556 Set<Processor> interceptors = new HashSet<Processor>();
1557 interceptors.add(target);
1558 for (InterceptorType interceptorType : list) {
1559 DelegateProcessor interceptor = interceptorType.createInterceptor(routeContext);
1560 if (!interceptors.contains(interceptor)) {
1561 interceptors.add(interceptor);
1562 if (interceptor.getProcessor() != null) {
1563 LOG.warn("Interceptor " + interceptor + " currently wraps target "
1564 + interceptor.getProcessor()
1565 + " is attempting to change target " + target
1566 + " new wrapping has been denied.");
1567 } else {
1568 interceptor.setProcessor(target);
1569 target = interceptor;
1570 }
1571 }
1572 }
1573 return target;
1574 }
1575
1576 /**
1577 * A strategy method to allow newly created processors to be wrapped in an
1578 * error handler.
1579 */
1580 protected Processor wrapInErrorHandler(RouteContext routeContext, Processor target) throws Exception {
1581 // The target is required.
1582 if (target == null) {
1583 throw new RuntimeCamelException("target not provided.");
1584 }
1585
1586 ErrorHandlerWrappingStrategy strategy = routeContext.getErrorHandlerWrappingStrategy();
1587
1588 if (strategy != null) {
1589 return strategy.wrapProcessorInErrorHandler(routeContext, this, target);
1590 }
1591
1592 return getErrorHandlerBuilder().createErrorHandler(routeContext, target);
1593 }
1594
1595 protected ErrorHandlerBuilder createErrorHandlerBuilder() {
1596 if (errorHandlerRef != null) {
1597 return new ErrorHandlerBuilderRef(errorHandlerRef);
1598 }
1599 if (isInheritErrorHandler()) {
1600 return new DeadLetterChannelBuilder();
1601 } else {
1602 return new NoErrorHandlerBuilder();
1603 }
1604 }
1605
1606 protected void configureChild(ProcessorType output) {
1607 output.setNodeFactory(getNodeFactory());
1608 }
1609
1610 public void addOutput(ProcessorType processorType) {
1611 processorType.setParent(this);
1612 configureChild(processorType);
1613 if (blocks.isEmpty()) {
1614 getOutputs().add(processorType);
1615 } else {
1616 Block block = blocks.getLast();
1617 block.addOutput(processorType);
1618 }
1619 }
1620
1621 /**
1622 * Creates a new instance of some kind of composite processor which defaults
1623 * to using a {@link Pipeline} but derived classes could change the
1624 * behaviour
1625 */
1626 protected Processor createCompositeProcessor(List<Processor> list) {
1627 // return new MulticastProcessor(list);
1628 return new Pipeline(list);
1629 }
1630
1631 protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType<?>> outputs)
1632 throws Exception {
1633 List<Processor> list = new ArrayList<Processor>();
1634 for (ProcessorType output : outputs) {
1635 Processor processor = output.createProcessor(routeContext);
1636 processor = output.wrapProcessorInInterceptors(routeContext, processor);
1637
1638 ProcessorType currentProcessor = this;
1639 if (!(currentProcessor instanceof ExceptionType || currentProcessor instanceof TryType)) {
1640 processor = output.wrapInErrorHandler(routeContext, processor);
1641 }
1642
1643 list.add(processor);
1644 }
1645 Processor processor = null;
1646 if (!list.isEmpty()) {
1647 if (list.size() == 1) {
1648 processor = list.get(0);
1649 } else {
1650 processor = createCompositeProcessor(list);
1651 }
1652 }
1653 return processor;
1654 }
1655
1656 public void clearOutput() {
1657 getOutputs().clear();
1658 blocks.clear();
1659 }
1660 }