001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.util;
018
019 import java.util.HashMap;
020 import java.util.Map;
021 import java.util.concurrent.ExecutionException;
022 import java.util.concurrent.Future;
023 import java.util.concurrent.TimeUnit;
024 import java.util.concurrent.TimeoutException;
025
026 import org.apache.camel.CamelContext;
027 import org.apache.camel.CamelExecutionException;
028 import org.apache.camel.Endpoint;
029 import org.apache.camel.Exchange;
030 import org.apache.camel.ExchangePattern;
031 import org.apache.camel.InvalidPayloadException;
032 import org.apache.camel.Message;
033 import org.apache.camel.NoSuchBeanException;
034 import org.apache.camel.NoSuchEndpointException;
035 import org.apache.camel.NoSuchHeaderException;
036 import org.apache.camel.NoSuchPropertyException;
037 import org.apache.camel.NoTypeConversionAvailableException;
038 import org.apache.camel.TypeConverter;
039
040 /**
041 * Some helper methods for working with {@link Exchange} objects
042 *
043 * @version $Revision: 790811 $
044 */
045 public final class ExchangeHelper {
046
047 /**
048 * Utility classes should not have a public constructor.
049 */
050 private ExchangeHelper() {
051 }
052
053 /**
054 * Extracts the exchange property of the given name and type; if it is not present then the
055 * default value will be used
056 *
057 * @param exchange the message exchange
058 * @param propertyName the name of the property on the exchange
059 * @param type the expected type of the property
060 * @param defaultValue the default value to be used if the property name does not exist or could not be
061 * converted to the given type
062 * @return the property value as the given type or the defaultValue if it could not be found or converted
063 */
064 public static <T> T getExchangeProperty(Exchange exchange, String propertyName, Class<T> type, T defaultValue) {
065 T answer = exchange.getProperty(propertyName, type);
066 if (answer == null) {
067 return defaultValue;
068 }
069 return answer;
070 }
071
072 /**
073 * Extracts the Exchange.BINDING of the given type or null if not present
074 *
075 * @param exchange the message exchange
076 * @param type the expected binding type
077 * @return the binding object of the given type or null if it could not be found or converted
078 */
079 public static <T> T getBinding(Exchange exchange, Class<T> type) {
080 return exchange != null ? (T) exchange.getProperty(Exchange.BINDING, type) : null;
081 }
082
083 /**
084 * Attempts to resolve the endpoint for the given value
085 *
086 * @param exchange the message exchange being processed
087 * @param value the value which can be an {@link Endpoint} or an object
088 * which provides a String representation of an endpoint via
089 * {@link #toString()}
090 *
091 * @return the endpoint
092 * @throws NoSuchEndpointException if the endpoint cannot be resolved
093 */
094 public static Endpoint resolveEndpoint(Exchange exchange, Object value)
095 throws NoSuchEndpointException {
096 Endpoint endpoint;
097 if (value instanceof Endpoint) {
098 endpoint = (Endpoint)value;
099 } else {
100 String uri = value.toString();
101 endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
102 }
103 return endpoint;
104 }
105
106 public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type)
107 throws NoSuchPropertyException {
108 T result = exchange.getProperty(propertyName, type);
109 if (result != null) {
110 return result;
111 }
112 throw new NoSuchPropertyException(exchange, propertyName, type);
113 }
114
115 public static <T> T getMandatoryHeader(Exchange exchange, String propertyName, Class<T> type)
116 throws NoSuchHeaderException {
117 T answer = exchange.getIn().getHeader(propertyName, type);
118 if (answer == null) {
119 throw new NoSuchHeaderException(exchange, propertyName, type);
120 }
121 return answer;
122 }
123
124 /**
125 * Returns the mandatory inbound message body of the correct type or throws
126 * an exception if it is not present
127 */
128 public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException {
129 return exchange.getIn().getMandatoryBody();
130 }
131
132 /**
133 * Returns the mandatory inbound message body of the correct type or throws
134 * an exception if it is not present
135 */
136 public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
137 return exchange.getIn().getMandatoryBody(type);
138 }
139
140 /**
141 * Returns the mandatory outbound message body of the correct type or throws
142 * an exception if it is not present
143 */
144 public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException {
145 return exchange.getOut().getMandatoryBody();
146 }
147
148 /**
149 * Returns the mandatory outbound message body of the correct type or throws
150 * an exception if it is not present
151 */
152 public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
153 return exchange.getOut().getMandatoryBody(type);
154 }
155
156 /**
157 * Converts the value to the given expected type or throws an exception
158 */
159 public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) throws NoTypeConversionAvailableException {
160 CamelContext camelContext = exchange.getContext();
161 TypeConverter converter = camelContext.getTypeConverter();
162 if (converter != null) {
163 return converter.mandatoryConvertTo(type, exchange, value);
164 }
165 throw new NoTypeConversionAvailableException(value, type);
166 }
167
168 /**
169 * Converts the value to the given expected type returning null if it could
170 * not be converted
171 */
172 public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) {
173 CamelContext camelContext = exchange.getContext();
174 TypeConverter converter = camelContext.getTypeConverter();
175 if (converter != null) {
176 return converter.convertTo(type, exchange, value);
177 }
178 return null;
179 }
180
181 /**
182 * Copies the results of a message exchange from the source exchange to the result exchange
183 * which will copy the out and fault message contents and the exception
184 *
185 * @param result the result exchange which will have the output and error state added
186 * @param source the source exchange which is not modified
187 */
188 public static void copyResults(Exchange result, Exchange source) {
189
190 // --------------------------------------------------------------------
191 // TODO: merge logic with that of copyResultsPreservePattern()
192 // --------------------------------------------------------------------
193
194 if (result != source) {
195 result.setException(source.getException());
196 if (source.hasFault()) {
197 result.getFault().copyFrom(source.getFault());
198 }
199
200 if (source.hasOut()) {
201 result.getOut().copyFrom(source.getOut());
202 } else if (result.getPattern() == ExchangePattern.InOptionalOut) {
203 // special case where the result is InOptionalOut and with no OUT response
204 // so we should return null to indicate this fact
205 result.setOut(null);
206 } else {
207 // no results so lets copy the last input
208 // as the final processor on a pipeline might not
209 // have created any OUT; such as a mock:endpoint
210 // so lets assume the last IN is the OUT
211 if (result.getPattern().isOutCapable()) {
212 // only set OUT if its OUT capable
213 result.getOut().copyFrom(source.getIn());
214 } else {
215 // if not replace IN instead to keep the MEP
216 result.getIn().copyFrom(source.getIn());
217 }
218 }
219 result.getProperties().clear();
220 result.getProperties().putAll(source.getProperties());
221 }
222 }
223
224 /**
225 * Copies the <code>source</code> exchange to <code>target</code> exchange
226 * preserving the {@link ExchangePattern} of <code>target</code>.
227 *
228 * @param source source exchange.
229 * @param result target exchange.
230 */
231 public static void copyResultsPreservePattern(Exchange result, Exchange source) {
232
233 // --------------------------------------------------------------------
234 // TODO: merge logic with that of copyResults()
235 // --------------------------------------------------------------------
236
237 if (source == result) {
238 // no need to copy
239 return;
240 }
241
242 // copy in message
243 result.getIn().copyFrom(source.getIn());
244
245 // copy out message
246 if (source.hasOut()) {
247 // exchange pattern sensitive
248 getResultMessage(result).copyFrom(source.getOut());
249 }
250
251 // copy fault message
252 if (source.hasFault()) {
253 result.getFault().copyFrom(source.getFault());
254 }
255
256 // copy exception
257 result.setException(source.getException());
258
259 // copy properties
260 result.getProperties().clear();
261 result.getProperties().putAll(source.getProperties());
262 }
263
264 /**
265 * Returns the message where to write results in an
266 * exchange-pattern-sensitive way.
267 *
268 * @param exchange
269 * message exchange.
270 * @return result message.
271 */
272 public static Message getResultMessage(Exchange exchange) {
273 if (exchange.getPattern().isOutCapable()) {
274 return exchange.getOut();
275 } else {
276 return exchange.getIn();
277 }
278 }
279
280 /**
281 * Returns true if the given exchange pattern (if defined) can support IN messagea
282 *
283 * @param exchange the exchange to interrogate
284 * @return true if the exchange is defined as an {@link ExchangePattern} which supports
285 * IN messages
286 */
287 public static boolean isInCapable(Exchange exchange) {
288 ExchangePattern pattern = exchange.getPattern();
289 return pattern != null && pattern.isInCapable();
290 }
291
292 /**
293 * Returns true if the given exchange pattern (if defined) can support OUT messagea
294 *
295 * @param exchange the exchange to interrogate
296 * @return true if the exchange is defined as an {@link ExchangePattern} which supports
297 * OUT messages
298 */
299 public static boolean isOutCapable(Exchange exchange) {
300 ExchangePattern pattern = exchange.getPattern();
301 return pattern != null && pattern.isOutCapable();
302 }
303
304 /**
305 * Creates a new instance of the given type from the injector
306 */
307 public static <T> T newInstance(Exchange exchange, Class<T> type) {
308 return exchange.getContext().getInjector().newInstance(type);
309 }
310
311 /**
312 * Creates a Map of the variables which are made available to a script or template
313 *
314 * @param exchange the exchange to make available
315 * @return a Map populated with the require dvariables
316 */
317 public static Map createVariableMap(Exchange exchange) {
318 Map answer = new HashMap();
319 populateVariableMap(exchange, answer);
320 return answer;
321 }
322
323 /**
324 * Populates the Map with the variables which are made available to a script or template
325 *
326 * @param exchange the exchange to make available
327 * @param map the map to populate
328 */
329 @SuppressWarnings("unchecked")
330 public static void populateVariableMap(Exchange exchange, Map map) {
331 map.put("exchange", exchange);
332 Message in = exchange.getIn();
333 map.put("in", in);
334 map.put("request", in);
335 map.put("headers", in.getHeaders());
336 map.put("body", in.getBody());
337 if (isOutCapable(exchange)) {
338 Message out = exchange.getOut();
339 map.put("out", out);
340 map.put("response", out);
341 }
342 map.put("camelContext", exchange.getContext());
343 }
344
345 /**
346 * Returns the MIME content type on the input message or null if one is not defined
347 */
348 public static String getContentType(Exchange exchange) {
349 return MessageHelper.getContentType(exchange.getIn());
350 }
351
352 /**
353 * Returns the MIME content encoding on the input message or null if one is not defined
354 */
355 public static String getContentEncoding(Exchange exchange) {
356 return MessageHelper.getContentEncoding(exchange.getIn());
357 }
358
359 /**
360 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
361 */
362 public static Object lookupMandatoryBean(Exchange exchange, String name) {
363 Object value = lookupBean(exchange, name);
364 if (value == null) {
365 throw new NoSuchBeanException(name);
366 }
367 return value;
368 }
369
370 /**
371 * Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
372 */
373 public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
374 T value = lookupBean(exchange, name, type);
375 if (value == null) {
376 throw new NoSuchBeanException(name);
377 }
378 return value;
379 }
380
381 /**
382 * Performs a lookup in the registry of the bean name
383 */
384 public static Object lookupBean(Exchange exchange, String name) {
385 return exchange.getContext().getRegistry().lookup(name);
386 }
387
388 /**
389 * Performs a lookup in the registry of the bean name and type
390 */
391 public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
392 return exchange.getContext().getRegistry().lookup(name, type);
393 }
394
395 /**
396 * Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
397 * or null if none could be found
398 */
399 public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
400 for (Exchange exchange : exchanges) {
401 String id = exchange.getExchangeId();
402 if (id != null && id.equals(exchangeId)) {
403 return exchange;
404 }
405 }
406 return null;
407 }
408
409 /**
410 * Prepares the exchanges for aggregation.
411 * <p/>
412 * This implementation will copy the OUT body to the IN body so when you do
413 * aggregation the body is <b>only</b> in the IN body to avoid confusing end users.
414 *
415 * @param oldExchange the old exchange
416 * @param newExchange the new exchange
417 */
418 public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) {
419 // copy body/header from OUT to IN
420 if (oldExchange != null) {
421 if (oldExchange.hasOut()) {
422 oldExchange.getIn().copyFrom(oldExchange.getOut());
423 oldExchange.setOut(null);
424 }
425 }
426
427 if (newExchange != null) {
428 if (newExchange.hasOut()) {
429 newExchange.getIn().copyFrom(newExchange.getOut());
430 newExchange.setOut(null);
431 }
432 }
433 }
434
435 public static boolean isFailureHandled(Exchange exchange) {
436 Boolean handled = exchange.getProperty(Exchange.FAILURE_HANDLED, Boolean.class);
437 return handled != null && handled;
438 }
439
440 public static void setFailureHandled(Exchange exchange) {
441 exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
442 // clear exception since its failure handled
443 exchange.setException(null);
444 }
445
446 /**
447 * Extracts the body from the given exchange.
448 * <p/>
449 * If the exchange pattern is provided it will try to honor it and retrive the body
450 * from either IN or OUT according to the pattern.
451 *
452 * @param exchange the exchange
453 * @param pattern exchange pattern if given, can be <tt>null</tt>
454 * @return the result body, can be <tt>null</tt>.
455 * @throws CamelExecutionException if the processing of the exchange failed
456 */
457 public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
458 Object answer = null;
459 if (exchange != null) {
460 // rethrow if there was an exception during execution
461 if (exchange.getException() != null) {
462 throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException());
463 }
464
465 // result could have a fault message
466 if (hasFaultMessage(exchange)) {
467 return exchange.getFault().getBody();
468 }
469
470 // okay no fault then return the response according to the pattern
471 // try to honor pattern if provided
472 boolean notOut = pattern != null && !pattern.isOutCapable();
473 boolean hasOut = exchange.hasOut();
474 if (hasOut && !notOut) {
475 // we have a response in out and the pattern is out capable
476 answer = exchange.getOut().getBody();
477 } else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
478 // special case where the result is InOptionalOut and with no OUT response
479 // so we should return null to indicate this fact
480 answer = null;
481 } else {
482 // use IN as the response
483 answer = exchange.getIn().getBody();
484 }
485 }
486 return answer;
487 }
488
489 /**
490 * Tests whether the exchange has a fault message set and that its not null.
491 *
492 * @param exchange the exchange
493 * @return <tt>true</tt> if fault message exists
494 */
495 public static boolean hasFaultMessage(Exchange exchange) {
496 if (exchange.hasFault()) {
497 Object faultBody = exchange.getFault().getBody();
498 if (faultBody != null) {
499 return true;
500 }
501 }
502 return false;
503 }
504
505 /**
506 * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
507 * <p/>
508 * Will wait until the future task is complete.
509 *
510 * @param context the camel context
511 * @param future the future handle
512 * @param type the expected body response type
513 * @return the result body, can be <tt>null</tt>.
514 * @throws CamelExecutionException if the processing of the exchange failed
515 */
516 public static <T> T extractFutureBody(CamelContext context, Future future, Class<T> type) {
517 try {
518 return doExtractFutureBody(context, future.get(), type);
519 } catch (InterruptedException e) {
520 throw ObjectHelper.wrapRuntimeCamelException(e);
521 } catch (ExecutionException e) {
522 // execution failed due to an exception so rethrow the cause
523 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
524 } finally {
525 // its harmless to cancel if task is already completed
526 // and in any case we do not want to get hold of the task a 2nd time
527 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
528 future.cancel(true);
529 }
530 }
531
532 /**
533 * Extracts the body from the given future, that represents a handle to an asynchronous exchange.
534 * <p/>
535 * Will wait for the future task to complete, but waiting at most the timeout value.
536 *
537 * @param context the camel context
538 * @param future the future handle
539 * @param timeout timeout value
540 * @param unit timeout unit
541 * @param type the expected body response type
542 * @return the result body, can be <tt>null</tt>.
543 * @throws CamelExecutionException if the processing of the exchange failed
544 * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered
545 */
546 public static <T> T extractFutureBody(CamelContext context, Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
547 try {
548 if (timeout > 0) {
549 return doExtractFutureBody(context, future.get(timeout, unit), type);
550 } else {
551 return doExtractFutureBody(context, future.get(), type);
552 }
553 } catch (InterruptedException e) {
554 throw ObjectHelper.wrapRuntimeCamelException(e);
555 } catch (ExecutionException e) {
556 // execution failed due to an exception so rethrow the cause
557 throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
558 } finally {
559 // its harmless to cancel if task is already completed
560 // and in any case we do not want to get hold of the task a 2nd time
561 // and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
562 future.cancel(true);
563 }
564 }
565
566 private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
567 if (result == null) {
568 return null;
569 }
570 if (type.isAssignableFrom(result.getClass())) {
571 return type.cast(result);
572 }
573 if (result instanceof Exchange) {
574 Exchange exchange = (Exchange) result;
575 Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
576 return context.getTypeConverter().convertTo(type, answer);
577 }
578 return context.getTypeConverter().convertTo(type, result);
579 }
580
581 }