001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.util.HashMap;
020 import java.util.Map;
021
022 import org.apache.camel.AsyncCallback;
023 import org.apache.camel.CamelContext;
024 import org.apache.camel.Endpoint;
025 import org.apache.camel.Exchange;
026 import org.apache.camel.ExchangePattern;
027 import org.apache.camel.Message;
028 import org.apache.camel.NoSuchEndpointException;
029 import org.apache.camel.Processor;
030 import org.apache.camel.Producer;
031 import org.apache.camel.ProducerTemplate;
032 import org.apache.camel.util.CamelContextHelper;
033 import org.apache.camel.util.ObjectHelper;
034
035 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
036
037 /**
038 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
039 * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an
040 * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}.
041 *
042 * @version $Revision: 719456 $
043 */
044 public class DefaultProducerTemplate<E extends Exchange> extends ServiceSupport implements ProducerTemplate<E> {
045 private CamelContext context;
046 private final ProducerCache<E> producerCache = new ProducerCache<E>();
047 private boolean useEndpointCache = true;
048 private final Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>();
049 private Endpoint<E> defaultEndpoint;
050
051 public DefaultProducerTemplate(CamelContext context) {
052 this.context = context;
053 }
054
055 public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) {
056 this(context);
057 this.defaultEndpoint = defaultEndpoint;
058 }
059
060
061 public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
062 Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
063 return new DefaultProducerTemplate(camelContext, endpoint);
064 }
065
066 public E send(String endpointUri, E exchange) {
067 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
068 return send(endpoint, exchange);
069 }
070
071 public E send(String endpointUri, Processor processor) {
072 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
073 return send(endpoint, processor);
074 }
075
076 public E send(String endpointUri, Processor processor, AsyncCallback callback) {
077 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
078 return send(endpoint, processor, callback);
079 }
080
081 public E send(String endpointUri, ExchangePattern pattern, Processor processor) {
082 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
083 return send(endpoint, pattern, processor);
084 }
085
086 public E send(Endpoint<E> endpoint, E exchange) {
087 E convertedExchange = exchange;
088 producerCache.send(endpoint, convertedExchange);
089 return convertedExchange;
090 }
091
092 public E send(Endpoint<E> endpoint, Processor processor) {
093 return producerCache.send(endpoint, processor);
094 }
095
096 public E send(Endpoint<E> endpoint, Processor processor, AsyncCallback callback) {
097 return producerCache.send(endpoint, processor, callback);
098 }
099
100 public E send(Endpoint<E> endpoint, ExchangePattern pattern, Processor processor) {
101 return producerCache.send(endpoint, pattern, processor);
102 }
103
104 public Object sendBody(Endpoint<E> endpoint, ExchangePattern pattern, Object body) {
105 E result = send(endpoint, pattern, createSetBodyProcessor(body));
106 return extractResultBody(result, pattern);
107 }
108
109 public Object sendBody(Endpoint<E> endpoint, Object body) {
110 E result = send(endpoint, createSetBodyProcessor(body));
111 return extractResultBody(result);
112 }
113
114 public Object sendBody(String endpointUri, Object body) {
115 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
116 return sendBody(endpoint, body);
117 }
118
119 public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) {
120 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
121 return sendBody(endpoint, pattern, body);
122 }
123
124 public Object sendBodyAndHeader(String endpointUri, final Object body, final String header,
125 final Object headerValue) {
126 return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
127 }
128
129 public Object sendBodyAndHeader(Endpoint<E> endpoint, final Object body, final String header,
130 final Object headerValue) {
131 E result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
132 return extractResultBody(result);
133 }
134
135 public Object sendBodyAndHeader(Endpoint<E> endpoint, ExchangePattern pattern, final Object body, final String header,
136 final Object headerValue) {
137 E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
138 return extractResultBody(result, pattern);
139 }
140
141 public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body, final String header,
142 final Object headerValue) {
143 E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
144 return extractResultBody(result, pattern);
145 }
146
147 public Object sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) {
148 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
149 }
150
151 public Object sendBodyAndHeaders(Endpoint<E> endpoint, final Object body, final Map<String, Object> headers) {
152 E result = send(endpoint, new Processor() {
153 public void process(Exchange exchange) {
154 Message in = exchange.getIn();
155 for (Map.Entry<String, Object> header : headers.entrySet()) {
156 in.setHeader(header.getKey(), header.getValue());
157 }
158 in.setBody(body);
159 }
160 });
161 return extractResultBody(result);
162 }
163
164 public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) {
165 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers);
166 }
167
168 public Object sendBodyAndHeaders(Endpoint<E> endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) {
169 E result = send(endpoint, pattern, new Processor() {
170 public void process(Exchange exchange) throws Exception {
171 Message in = exchange.getIn();
172 for (Map.Entry<String, Object> header : headers.entrySet()) {
173 in.setHeader(header.getKey(), header.getValue());
174 }
175 in.setBody(body);
176 }
177 });
178 return extractResultBody(result);
179 }
180
181 // Methods using an InOut ExchangePattern
182 // -----------------------------------------------------------------------
183
184 public E request(Endpoint<E> endpoint, Processor processor) {
185 return send(endpoint, ExchangePattern.InOut, processor);
186 }
187
188 public Object requestBody(Endpoint<E> endpoint, Object body) {
189 return sendBody(endpoint, ExchangePattern.InOut, body);
190 }
191
192 public Object requestBodyAndHeader(Endpoint<E> endpoint, Object body, String header, Object headerValue) {
193 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
194 }
195
196 public E request(String endpoint, Processor processor) {
197 return send(endpoint, ExchangePattern.InOut, processor);
198 }
199
200 public Object requestBody(String endpoint, Object body) {
201 return sendBody(endpoint, ExchangePattern.InOut, body);
202 }
203
204 public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) {
205 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
206 }
207
208 public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) {
209 return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
210 }
211
212 public Object requestBodyAndHeaders(Endpoint<E> endpoint, final Object body, final Map<String, Object> headers) {
213 return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers);
214 }
215
216 // Methods using the default endpoint
217 // -----------------------------------------------------------------------
218
219 public Object sendBody(Object body) {
220 return sendBody(getMandatoryDefaultEndpoint(), body);
221 }
222
223 public E send(E exchange) {
224 return send(getMandatoryDefaultEndpoint(), exchange);
225 }
226
227 public E send(Processor processor) {
228 return send(getMandatoryDefaultEndpoint(), processor);
229 }
230
231 public Object sendBodyAndHeader(Object body, String header, Object headerValue) {
232 return sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
233 }
234
235 public Object sendBodyAndHeaders(Object body, Map<String, Object> headers) {
236 return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
237 }
238
239 // Properties
240 // -----------------------------------------------------------------------
241 public Producer<E> getProducer(Endpoint<E> endpoint) {
242 return producerCache.getProducer(endpoint);
243 }
244
245 public CamelContext getContext() {
246 return context;
247 }
248
249 public Endpoint<E> getDefaultEndpoint() {
250 return defaultEndpoint;
251 }
252
253 public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) {
254 this.defaultEndpoint = defaultEndpoint;
255 }
256
257 /**
258 * Sets the default endpoint to use if none is specified
259 */
260 public void setDefaultEndpointUri(String endpointUri) {
261 setDefaultEndpoint(getContext().getEndpoint(endpointUri));
262 }
263
264 public boolean isUseEndpointCache() {
265 return useEndpointCache;
266 }
267
268 public void setUseEndpointCache(boolean useEndpointCache) {
269 this.useEndpointCache = useEndpointCache;
270 }
271
272 public <T extends Endpoint<?>> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
273 Endpoint<?> e = null;
274 synchronized (endpointCache) {
275 e = endpointCache.get(endpointUri);
276 }
277 if (e != null && expectedClass.isAssignableFrom(e.getClass())) {
278 return expectedClass.asSubclass(expectedClass).cast(e);
279 }
280 return null;
281 }
282
283 // Implementation methods
284 // -----------------------------------------------------------------------
285
286 protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
287 return new Processor() {
288 public void process(Exchange exchange) {
289 Message in = exchange.getIn();
290 in.setHeader(header, headerValue);
291 in.setBody(body);
292 }
293 };
294 }
295
296 protected Processor createSetBodyProcessor(final Object body) {
297 return new Processor() {
298 public void process(Exchange exchange) {
299 Message in = exchange.getIn();
300 in.setBody(body);
301 }
302 };
303 }
304
305 protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
306 Endpoint endpoint = null;
307
308 if (isUseEndpointCache()) {
309 synchronized (endpointCache) {
310 endpoint = endpointCache.get(endpointUri);
311 if (endpoint == null) {
312 endpoint = context.getEndpoint(endpointUri);
313 if (endpoint != null) {
314 endpointCache.put(endpointUri, endpoint);
315 }
316 }
317 }
318 } else {
319 endpoint = context.getEndpoint(endpointUri);
320 }
321 if (endpoint == null) {
322 throw new NoSuchEndpointException(endpointUri);
323 }
324 return endpoint;
325 }
326
327 protected Endpoint<E> getMandatoryDefaultEndpoint() {
328 Endpoint<E> answer = getDefaultEndpoint();
329 ObjectHelper.notNull(answer, "defaultEndpoint");
330 return answer;
331 }
332
333 protected void doStart() throws Exception {
334 producerCache.start();
335 }
336
337 protected void doStop() throws Exception {
338 producerCache.stop();
339 endpointCache.clear();
340 }
341
342 /**
343 * Extracts the body from the given result.
344 *
345 * @param result the result
346 * @return the result, can be <tt>null</tt>.
347 */
348 protected Object extractResultBody(E result) {
349 return extractResultBody(result, null);
350 }
351
352 /**
353 * Extracts the body from the given result.
354 * <p/>
355 * If the exchange pattern is provided it will try to honor it and retrive the body
356 * from either IN or OUT according to the pattern.
357 *
358 * @param result the result
359 * @param pattern exchange pattern if given, can be <tt>null</tt>
360 * @return the result, can be <tt>null</tt>.
361 */
362 protected Object extractResultBody(E result, ExchangePattern pattern) {
363 Object answer = null;
364 if (result != null) {
365 // rethrow if there was an exception
366 if (result.getException() != null) {
367 throw wrapRuntimeCamelException(result.getException());
368 }
369
370 // result could have a fault message
371 if (hasFaultMessage(result)) {
372 return result.getFault().getBody();
373 }
374
375 // okay no fault then return the response according to the pattern
376 // try to honor pattern if provided
377 boolean notOut = pattern != null && !pattern.isOutCapable();
378 boolean hasOut = result.getOut(false) != null;
379 if (hasOut && !notOut) {
380 answer = result.getOut().getBody();
381 } else {
382 answer = result.getIn().getBody();
383 }
384 }
385 return answer;
386 }
387
388 protected boolean hasFaultMessage(E result) {
389 Message faultMessage = result.getFault(false);
390 if (faultMessage != null) {
391 Object faultBody = faultMessage.getBody();
392 if (faultBody != null) {
393 return true;
394 }
395 }
396 return false;
397 }
398
399 }