001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.util.HashMap;
020 import java.util.Map;
021
022 import org.apache.camel.AsyncCallback;
023 import org.apache.camel.CamelContext;
024 import org.apache.camel.Endpoint;
025 import org.apache.camel.Exchange;
026 import org.apache.camel.ExchangePattern;
027 import org.apache.camel.Message;
028 import org.apache.camel.NoSuchEndpointException;
029 import org.apache.camel.Processor;
030 import org.apache.camel.Producer;
031 import org.apache.camel.ProducerTemplate;
032 import org.apache.camel.util.ObjectHelper;
033
034 /**
035 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate
036 * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an
037 * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}.
038 *
039 * @version $Revision: 675919 $
040 */
041 public class DefaultProducerTemplate<E extends Exchange> extends ServiceSupport implements ProducerTemplate<E> {
042 private CamelContext context;
043 private final ProducerCache<E> producerCache = new ProducerCache<E>();
044 private boolean useEndpointCache = true;
045 private final Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>();
046 private Endpoint<E> defaultEndpoint;
047
048 public DefaultProducerTemplate(CamelContext context) {
049 this.context = context;
050 }
051
052 public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) {
053 this(context);
054 this.defaultEndpoint = defaultEndpoint;
055 }
056
057 public E send(String endpointUri, E exchange) {
058 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
059 return send(endpoint, exchange);
060 }
061
062 public E send(String endpointUri, Processor processor) {
063 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
064 return send(endpoint, processor);
065 }
066
067 public E send(String endpointUri, Processor processor, AsyncCallback callback) {
068 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
069 return send(endpoint, processor, callback);
070 }
071
072 public E send(String endpointUri, ExchangePattern pattern, Processor processor) {
073 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
074 return send(endpoint, pattern, processor);
075 }
076
077 public E send(Endpoint<E> endpoint, E exchange) {
078 E convertedExchange = exchange;
079 producerCache.send(endpoint, convertedExchange);
080 return convertedExchange;
081 }
082
083 public E send(Endpoint<E> endpoint, Processor processor) {
084 return producerCache.send(endpoint, processor);
085 }
086
087 public E send(Endpoint<E> endpoint, Processor processor, AsyncCallback callback) {
088 return producerCache.send(endpoint, processor, callback);
089 }
090
091 public E send(Endpoint<E> endpoint, ExchangePattern pattern, Processor processor) {
092 return producerCache.send(endpoint, pattern, processor);
093 }
094
095 public Object sendBody(Endpoint<E> endpoint, ExchangePattern pattern, Object body) {
096 E result = send(endpoint, pattern, createSetBodyProcessor(body));
097 return extractResultBody(result, pattern);
098 }
099
100 public Object sendBody(Endpoint<E> endpoint, Object body) {
101 E result = send(endpoint, createSetBodyProcessor(body));
102 return extractResultBody(result);
103 }
104
105 public Object sendBody(String endpointUri, Object body) {
106 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
107 return sendBody(endpoint, body);
108 }
109
110 public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) {
111 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
112 return sendBody(endpoint, pattern, body);
113 }
114
115 public Object sendBodyAndHeader(String endpointUri, final Object body, final String header,
116 final Object headerValue) {
117 return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
118 }
119
120 public Object sendBodyAndHeader(Endpoint endpoint, final Object body, final String header,
121 final Object headerValue) {
122 E result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
123 return extractResultBody(result);
124 }
125
126 public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body, final String header,
127 final Object headerValue) {
128 E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
129 return extractResultBody(result, pattern);
130 }
131
132 public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body, final String header,
133 final Object headerValue) {
134 E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
135 return extractResultBody(result, pattern);
136 }
137
138 public Object sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) {
139 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
140 }
141
142 public Object sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
143 E result = send(endpoint, new Processor() {
144 public void process(Exchange exchange) {
145 Message in = exchange.getIn();
146 for (Map.Entry<String, Object> header : headers.entrySet()) {
147 in.setHeader(header.getKey(), header.getValue());
148 }
149 in.setBody(body);
150 }
151 });
152 return extractResultBody(result);
153 }
154
155 // Methods using an InOut ExchangePattern
156 // -----------------------------------------------------------------------
157
158 public E request(Endpoint<E> endpoint, Processor processor) {
159 return send(endpoint, ExchangePattern.InOut, processor);
160 }
161
162 public Object requestBody(Endpoint<E> endpoint, Object body) {
163 return sendBody(endpoint, ExchangePattern.InOut, body);
164 }
165
166 public Object requestBodyAndHeader(Endpoint<E> endpoint, Object body, String header, Object headerValue) {
167 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
168 }
169
170 public E request(String endpoint, Processor processor) {
171 return send(endpoint, ExchangePattern.InOut, processor);
172 }
173
174 public Object requestBody(String endpoint, Object body) {
175 return sendBody(endpoint, ExchangePattern.InOut, body);
176 }
177
178 public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) {
179 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
180 }
181
182 // Methods using the default endpoint
183 // -----------------------------------------------------------------------
184
185 public Object sendBody(Object body) {
186 return sendBody(getMandatoryDefaultEndpoint(), body);
187 }
188
189 public E send(E exchange) {
190 return send(getMandatoryDefaultEndpoint(), exchange);
191 }
192
193 public E send(Processor processor) {
194 return send(getMandatoryDefaultEndpoint(), processor);
195 }
196
197 public Object sendBodyAndHeader(Object body, String header, Object headerValue) {
198 return sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
199 }
200
201 public Object sendBodyAndHeaders(Object body, Map<String, Object> headers) {
202 return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
203 }
204
205 // Properties
206 // -----------------------------------------------------------------------
207 public Producer<E> getProducer(Endpoint<E> endpoint) {
208 return producerCache.getProducer(endpoint);
209 }
210
211 public CamelContext getContext() {
212 return context;
213 }
214
215 public Endpoint<E> getDefaultEndpoint() {
216 return defaultEndpoint;
217 }
218
219 public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) {
220 this.defaultEndpoint = defaultEndpoint;
221 }
222
223 /**
224 * Sets the default endpoint to use if none is specified
225 */
226 public void setDefaultEndpointUri(String endpointUri) {
227 setDefaultEndpoint(getContext().getEndpoint(endpointUri));
228 }
229
230 public boolean isUseEndpointCache() {
231 return useEndpointCache;
232 }
233
234 public void setUseEndpointCache(boolean useEndpointCache) {
235 this.useEndpointCache = useEndpointCache;
236 }
237
238 public <T extends Endpoint<?>> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
239 Endpoint<?> e = null;
240 synchronized (endpointCache) {
241 e = endpointCache.get(endpointUri);
242 }
243 if (e != null && expectedClass.isAssignableFrom(e.getClass())) {
244 return expectedClass.asSubclass(expectedClass).cast(e);
245 }
246 return null;
247 }
248
249 // Implementation methods
250 // -----------------------------------------------------------------------
251
252 protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
253 return new Processor() {
254 public void process(Exchange exchange) {
255 Message in = exchange.getIn();
256 in.setHeader(header, headerValue);
257 in.setBody(body);
258 }
259 };
260 }
261
262 protected Processor createSetBodyProcessor(final Object body) {
263 return new Processor() {
264 public void process(Exchange exchange) {
265 Message in = exchange.getIn();
266 in.setBody(body);
267 }
268 };
269 }
270
271 protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
272 Endpoint endpoint = null;
273
274 if (isUseEndpointCache()) {
275 synchronized (endpointCache) {
276 endpoint = endpointCache.get(endpointUri);
277 if (endpoint == null) {
278 endpoint = context.getEndpoint(endpointUri);
279 if (endpoint != null) {
280 endpointCache.put(endpointUri, endpoint);
281 }
282 }
283 }
284 } else {
285 endpoint = context.getEndpoint(endpointUri);
286 }
287 if (endpoint == null) {
288 throw new NoSuchEndpointException(endpointUri);
289 }
290 return endpoint;
291 }
292
293 protected Endpoint<E> getMandatoryDefaultEndpoint() {
294 Endpoint<E> answer = getDefaultEndpoint();
295 ObjectHelper.notNull(answer, "defaultEndpoint");
296 return answer;
297 }
298
299 protected void doStart() throws Exception {
300 producerCache.start();
301 }
302
303 protected void doStop() throws Exception {
304 producerCache.stop();
305 endpointCache.clear();
306 }
307
308 /**
309 * Extracts the body from the given result.
310 *
311 * @param result the result
312 * @return the result, can be <tt>null</tt>.
313 */
314 protected Object extractResultBody(E result) {
315 return extractResultBody(result, null);
316 }
317
318 /**
319 * Extracts the body from the given result.
320 * <p/>
321 * If the exchange pattern is provided it will try to honor it and retrive the body
322 * from either IN or OUT according to the pattern.
323 *
324 * @param result the result
325 * @param pattern exchange pattern if given, can be <tt>null</tt>
326 * @return the result, can be <tt>null</tt>.
327 */
328 protected Object extractResultBody(E result, ExchangePattern pattern) {
329 Object answer = null;
330 if (result != null) {
331 // try to honor pattern if provided
332 boolean notOut = pattern != null && !pattern.isOutCapable();
333 boolean hasOut = result.getOut(false) != null;
334 if (hasOut && !notOut) {
335 answer = result.getOut().getBody();
336 } else {
337 answer = result.getIn().getBody();
338 }
339 }
340 return answer;
341 }
342
343 }