001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.util.Map;
020 import java.util.concurrent.Callable;
021 import java.util.concurrent.ExecutorService;
022 import java.util.concurrent.Future;
023 import java.util.concurrent.TimeUnit;
024 import java.util.concurrent.TimeoutException;
025
026 import org.apache.camel.CamelContext;
027 import org.apache.camel.Endpoint;
028 import org.apache.camel.Exchange;
029 import org.apache.camel.ExchangePattern;
030 import org.apache.camel.Message;
031 import org.apache.camel.NoSuchEndpointException;
032 import org.apache.camel.Processor;
033 import org.apache.camel.ProducerTemplate;
034 import org.apache.camel.util.CamelContextHelper;
035 import org.apache.camel.util.ExchangeHelper;
036 import org.apache.camel.util.ObjectHelper;
037 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
038
039 /**
040 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
041 * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an
042 * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}.
043 *
044 * @version $Revision: 794648 $
045 */
046 public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
047 private static final int DEFAULT_THREADPOOL_SIZE = 5;
048 private final CamelContext context;
049 private final ProducerCache producerCache;
050 private Endpoint defaultEndpoint;
051 private ExecutorService executor;
052
053 public DefaultProducerTemplate(CamelContext context) {
054 this.context = context;
055 this.executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true);
056 this.producerCache = new ProducerCache(context.getProducerServicePool());
057 }
058
059 public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
060 this.context = context;
061 this.executor = executor;
062 this.producerCache = new ProducerCache(context.getProducerServicePool());
063 }
064
065 public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) {
066 this(context);
067 this.defaultEndpoint = defaultEndpoint;
068 }
069
070 public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
071 Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
072 return new DefaultProducerTemplate(camelContext, endpoint);
073 }
074
075 public Exchange send(String endpointUri, Exchange exchange) {
076 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
077 return send(endpoint, exchange);
078 }
079
080 public Exchange send(String endpointUri, Processor processor) {
081 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
082 return send(endpoint, processor);
083 }
084
085 public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
086 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
087 return send(endpoint, pattern, processor);
088 }
089
090 public Exchange send(Endpoint endpoint, Exchange exchange) {
091 producerCache.send(endpoint, exchange);
092 return exchange;
093 }
094
095 public Exchange send(Endpoint endpoint, Processor processor) {
096 return producerCache.send(endpoint, processor);
097 }
098
099 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
100 return producerCache.send(endpoint, pattern, processor);
101 }
102
103 public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
104 Exchange result = send(endpoint, pattern, createSetBodyProcessor(body));
105 return extractResultBody(result, pattern);
106 }
107
108 public void sendBody(Endpoint endpoint, Object body) {
109 Exchange result = send(endpoint, createSetBodyProcessor(body));
110 // must invoke extract result body in case of exception to be rethrown
111 extractResultBody(result);
112 }
113
114 public void sendBody(String endpointUri, Object body) {
115 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
116 sendBody(endpoint, body);
117 }
118
119 public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) {
120 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
121 Object result = sendBody(endpoint, pattern, body);
122 if (pattern.isOutCapable()) {
123 return result;
124 } else {
125 // return null if not OUT capable
126 return null;
127 }
128 }
129
130 public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) {
131 sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
132 }
133
134 public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) {
135 Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
136 // must invoke extract result body in case of exception to be rethrown
137 extractResultBody(result);
138 }
139
140 public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body,
141 final String header, final Object headerValue) {
142 Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
143 Object result = extractResultBody(exchange, pattern);
144 if (pattern.isOutCapable()) {
145 return result;
146 } else {
147 // return null if not OUT capable
148 return null;
149 }
150 }
151
152 public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body,
153 final String header, final Object headerValue) {
154 Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
155 Object result = extractResultBody(exchange, pattern);
156 if (pattern.isOutCapable()) {
157 return result;
158 } else {
159 // return null if not OUT capable
160 return null;
161 }
162 }
163
164 public void sendBodyAndProperty(String endpointUri, final Object body,
165 final String property, final Object propertyValue) {
166 sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue);
167 }
168
169 public void sendBodyAndProperty(Endpoint endpoint, final Object body,
170 final String property, final Object propertyValue) {
171 Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue));
172 // must invoke extract result body in case of exception to be rethrown
173 extractResultBody(result);
174 }
175
176 public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body,
177 final String property, final Object propertyValue) {
178 Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
179 Object result = extractResultBody(exchange, pattern);
180 if (pattern.isOutCapable()) {
181 return result;
182 } else {
183 // return null if not OUT capable
184 return null;
185 }
186 }
187
188 public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body,
189 final String property, final Object propertyValue) {
190 Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
191 Object result = extractResultBody(exchange, pattern);
192 if (pattern.isOutCapable()) {
193 return result;
194 } else {
195 // return null if not OUT capable
196 return null;
197 }
198 }
199
200 public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) {
201 sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
202 }
203
204 public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
205 Exchange result = send(endpoint, new Processor() {
206 public void process(Exchange exchange) {
207 Message in = exchange.getIn();
208 for (Map.Entry<String, Object> header : headers.entrySet()) {
209 in.setHeader(header.getKey(), header.getValue());
210 }
211 in.setBody(body);
212 }
213 });
214 // must invoke extract result body in case of exception to be rethrown
215 extractResultBody(result);
216 }
217
218 public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) {
219 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers);
220 }
221
222 public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) {
223 Exchange exchange = send(endpoint, pattern, new Processor() {
224 public void process(Exchange exchange) throws Exception {
225 Message in = exchange.getIn();
226 for (Map.Entry<String, Object> header : headers.entrySet()) {
227 in.setHeader(header.getKey(), header.getValue());
228 }
229 in.setBody(body);
230 }
231 });
232 Object result = extractResultBody(exchange, pattern);
233 if (pattern.isOutCapable()) {
234 return result;
235 } else {
236 // return null if not OUT capable
237 return null;
238 }
239 }
240
241 // Methods using an InOut ExchangePattern
242 // -----------------------------------------------------------------------
243
244 public Exchange request(Endpoint endpoint, Processor processor) {
245 return send(endpoint, ExchangePattern.InOut, processor);
246 }
247
248 public Object requestBody(Object body) {
249 return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body);
250 }
251
252 public Object requestBody(Endpoint endpoint, Object body) {
253 return sendBody(endpoint, ExchangePattern.InOut, body);
254 }
255
256 public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) {
257 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
258 }
259
260 public Exchange request(String endpoint, Processor processor) {
261 return send(endpoint, ExchangePattern.InOut, processor);
262 }
263
264 public Object requestBody(String endpoint, Object body) {
265 return sendBody(endpoint, ExchangePattern.InOut, body);
266 }
267
268 public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) {
269 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
270 }
271
272 public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) {
273 return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
274 }
275
276 public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
277 return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers);
278 }
279
280 public <T> T requestBody(Object body, Class<T> type) {
281 Object answer = requestBody(body);
282 return context.getTypeConverter().convertTo(type, answer);
283 }
284
285 public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
286 Object answer = requestBody(endpoint, body);
287 return context.getTypeConverter().convertTo(type, answer);
288 }
289
290 public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
291 Object answer = requestBody(endpointUri, body);
292 return context.getTypeConverter().convertTo(type, answer);
293 }
294
295 public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
296 Object answer = requestBodyAndHeader(endpoint, body, header, headerValue);
297 return context.getTypeConverter().convertTo(type, answer);
298 }
299
300 public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
301 Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue);
302 return context.getTypeConverter().convertTo(type, answer);
303 }
304
305 public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
306 Object answer = requestBodyAndHeaders(endpointUri, body, headers);
307 return context.getTypeConverter().convertTo(type, answer);
308 }
309
310 public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
311 Object answer = requestBodyAndHeaders(endpoint, body, headers);
312 return context.getTypeConverter().convertTo(type, answer);
313 }
314
315 // Methods using the default endpoint
316 // -----------------------------------------------------------------------
317
318 public void sendBody(Object body) {
319 sendBody(getMandatoryDefaultEndpoint(), body);
320 }
321
322 public Exchange send(Exchange exchange) {
323 return send(getMandatoryDefaultEndpoint(), exchange);
324 }
325
326 public Exchange send(Processor processor) {
327 return send(getMandatoryDefaultEndpoint(), processor);
328 }
329
330 public void sendBodyAndHeader(Object body, String header, Object headerValue) {
331 sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
332 }
333
334 public void sendBodyAndProperty(Object body, String property, Object propertyValue) {
335 sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue);
336 }
337
338 public void sendBodyAndHeaders(Object body, Map<String, Object> headers) {
339 sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
340 }
341
342 // Properties
343 // -----------------------------------------------------------------------
344 public CamelContext getContext() {
345 return context;
346 }
347
348 public Endpoint getDefaultEndpoint() {
349 return defaultEndpoint;
350 }
351
352 public void setDefaultEndpoint(Endpoint defaultEndpoint) {
353 this.defaultEndpoint = defaultEndpoint;
354 }
355
356 /**
357 * Sets the default endpoint to use if none is specified
358 */
359 public void setDefaultEndpointUri(String endpointUri) {
360 setDefaultEndpoint(getContext().getEndpoint(endpointUri));
361 }
362
363 public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
364 return context.getEndpoint(endpointUri, expectedClass);
365 }
366
367 // Implementation methods
368 // -----------------------------------------------------------------------
369
370 protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
371 return new Processor() {
372 public void process(Exchange exchange) {
373 Message in = exchange.getIn();
374 in.setHeader(header, headerValue);
375 in.setBody(body);
376 }
377 };
378 }
379
380 protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
381 return new Processor() {
382 public void process(Exchange exchange) {
383 exchange.setProperty(property, propertyValue);
384
385 Message in = exchange.getIn();
386 in.setBody(body);
387 }
388 };
389 }
390
391 protected Processor createSetBodyProcessor(final Object body) {
392 return new Processor() {
393 public void process(Exchange exchange) {
394 Message in = exchange.getIn();
395 in.setBody(body);
396 }
397 };
398 }
399
400 protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
401 Endpoint endpoint = context.getEndpoint(endpointUri);
402 if (endpoint == null) {
403 throw new NoSuchEndpointException(endpointUri);
404 }
405 return endpoint;
406 }
407
408 protected Endpoint getMandatoryDefaultEndpoint() {
409 Endpoint answer = getDefaultEndpoint();
410 ObjectHelper.notNull(answer, "defaultEndpoint");
411 return answer;
412 }
413
414 protected void doStart() throws Exception {
415 producerCache.start();
416 }
417
418 protected void doStop() throws Exception {
419 producerCache.stop();
420 if (executor != null) {
421 executor.shutdown();
422 }
423 }
424
425 protected Object extractResultBody(Exchange result) {
426 return extractResultBody(result, null);
427 }
428
429 protected Object extractResultBody(Exchange result, ExchangePattern pattern) {
430 return ExchangeHelper.extractResultBody(result, pattern);
431 }
432
433 public void setExecutorService(ExecutorService executorService) {
434 this.executor = executorService;
435 }
436
437 public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
438 Callable<Exchange> task = new Callable<Exchange>() {
439 public Exchange call() throws Exception {
440 return send(uri, exchange);
441 }
442 };
443
444 return executor.submit(task);
445 }
446
447 public Future<Exchange> asyncSend(final String uri, final Processor processor) {
448 Callable<Exchange> task = new Callable<Exchange>() {
449 public Exchange call() throws Exception {
450 return send(uri, processor);
451 }
452 };
453
454 return executor.submit(task);
455 }
456
457 public Future<Object> asyncSendBody(final String uri, final Object body) {
458 Callable<Object> task = new Callable<Object>() {
459 public Object call() throws Exception {
460 sendBody(uri, body);
461 // its InOnly, so no body to return
462 return null;
463 }
464 };
465
466 return executor.submit(task);
467 }
468
469 public Future<Object> asyncRequestBody(final String uri, final Object body) {
470 Callable<Object> task = new Callable<Object>() {
471 public Object call() throws Exception {
472 return requestBody(uri, body);
473 }
474 };
475
476 return executor.submit(task);
477 }
478
479 public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
480 Callable<T> task = new Callable<T>() {
481 public T call() throws Exception {
482 return requestBody(uri, body, type);
483 }
484 };
485
486 return executor.submit(task);
487 }
488
489 public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
490 Callable<Object> task = new Callable<Object>() {
491 public Object call() throws Exception {
492 return requestBodyAndHeader(endpointUri, body, header, headerValue);
493 }
494 };
495
496 return executor.submit(task);
497 }
498
499 public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
500 Callable<T> task = new Callable<T>() {
501 public T call() throws Exception {
502 return requestBodyAndHeader(endpointUri, body, header, headerValue, type);
503 }
504 };
505
506 return executor.submit(task);
507 }
508
509 public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
510 Callable<Object> task = new Callable<Object>() {
511 public Object call() throws Exception {
512 return requestBodyAndHeaders(endpointUri, body, headers);
513 }
514 };
515
516 return executor.submit(task);
517 }
518
519 public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
520 Callable<T> task = new Callable<T>() {
521 public T call() throws Exception {
522 return requestBodyAndHeaders(endpointUri, body, headers, type);
523 }
524 };
525
526 return executor.submit(task);
527 }
528
529 public <T> T extractFutureBody(Future future, Class<T> type) {
530 return ExchangeHelper.extractFutureBody(context, future, type);
531 }
532
533 public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
534 return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
535 }
536
537 }