001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.util.ArrayList;
020 import java.util.Iterator;
021 import java.util.List;
022 import java.util.Map;
023 import java.util.concurrent.ConcurrentHashMap;
024
025 import org.apache.camel.CamelContext;
026 import org.apache.camel.Endpoint;
027 import org.apache.camel.Exchange;
028 import org.apache.camel.ExchangePattern;
029 import org.apache.camel.Message;
030 import org.apache.camel.spi.Synchronization;
031 import org.apache.camel.spi.UnitOfWork;
032 import org.apache.camel.util.ExchangeHelper;
033 import org.apache.camel.util.ObjectHelper;
034 import org.apache.camel.util.UuidGenerator;
035
036 /**
037 * A default implementation of {@link Exchange}
038 *
039 * @version $Revision: 792977 $
040 */
041 public final class DefaultExchange implements Exchange {
042
043 private static final UuidGenerator DEFAULT_ID_GENERATOR = new UuidGenerator();
044 protected final CamelContext context;
045 private Map<String, Object> properties;
046 private Message in;
047 private Message out;
048 private Message fault;
049 private Exception exception;
050 private String exchangeId;
051 private UnitOfWork unitOfWork;
052 private ExchangePattern pattern;
053 private Endpoint fromEndpoint;
054 private List<Synchronization> onCompletions;
055
056 public DefaultExchange(CamelContext context) {
057 this(context, ExchangePattern.InOnly);
058 }
059
060 public DefaultExchange(CamelContext context, ExchangePattern pattern) {
061 this.context = context;
062 this.pattern = pattern;
063 }
064
065 public DefaultExchange(Exchange parent) {
066 this(parent.getContext(), parent.getPattern());
067 this.unitOfWork = parent.getUnitOfWork();
068 this.fromEndpoint = parent.getFromEndpoint();
069 }
070
071 public DefaultExchange(Endpoint fromEndpoint) {
072 this(fromEndpoint, ExchangePattern.InOnly);
073 }
074
075 public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
076 this.context = fromEndpoint.getCamelContext();
077 this.fromEndpoint = fromEndpoint;
078 this.pattern = pattern;
079 }
080
081 @Override
082 public String toString() {
083 return "Exchange[" + in + "]";
084 }
085
086 public Exchange copy() {
087 Exchange exchange = newInstance();
088 exchange.copyFrom(this);
089 return exchange;
090 }
091
092 public Exchange copy(boolean handoverOnCompletion) {
093 Exchange copy = copy();
094 // do not share the unit of work
095 copy.setUnitOfWork(null);
096 // handover on completeion to the copy if we got any
097 if (handoverOnCompletion && unitOfWork != null) {
098 unitOfWork.handoverSynchronization(copy);
099 }
100 // set a correlation id so we can track back the original exchange
101 copy.setProperty(Exchange.CORRELATION_ID, this.getExchangeId());
102 return copy;
103 }
104
105 public void copyFrom(Exchange exchange) {
106 if (exchange == this) {
107 return;
108 }
109 setProperties(safeCopy(exchange.getProperties()));
110
111 // this can cause strangeness if we copy, say, a FileMessage onto an FtpExchange with overloaded getExchange() methods etc.
112 safeCopy(getIn(), exchange.getIn());
113 if (exchange.hasOut()) {
114 safeCopy(getOut(), exchange.getOut());
115 }
116 if (exchange.hasFault()) {
117 safeCopy(getFault(), exchange.getFault());
118 }
119 setException(exchange.getException());
120
121 unitOfWork = exchange.getUnitOfWork();
122 pattern = exchange.getPattern();
123 setFromEndpoint(exchange.getFromEndpoint());
124 }
125
126 private static void safeCopy(Message message, Message that) {
127 if (message != null) {
128 message.copyFrom(that);
129 }
130 }
131
132 private static Map<String, Object> safeCopy(Map<String, Object> properties) {
133 if (properties == null) {
134 return null;
135 }
136 return new ConcurrentHashMap<String, Object>(properties);
137 }
138
139 public Exchange newInstance() {
140 return new DefaultExchange(this);
141 }
142
143 public CamelContext getContext() {
144 return context;
145 }
146
147 public Object getProperty(String name) {
148 if (properties != null) {
149 return properties.get(name);
150 }
151 return null;
152 }
153
154 public <T> T getProperty(String name, Class<T> type) {
155 Object value = getProperty(name);
156
157 // eager same instance type test to avoid the overhead of invoking the type converter
158 // if already same type
159 if (type.isInstance(value)) {
160 return type.cast(value);
161 }
162
163 return ExchangeHelper.convertToType(this, type, value);
164 }
165
166 public void setProperty(String name, Object value) {
167 if (value != null) {
168 // avoid the NullPointException
169 getProperties().put(name, value);
170 } else {
171 // if the value is null, we just remove the key from the map
172 if (name != null) {
173 getProperties().remove(name);
174 }
175 }
176 }
177
178 public Object removeProperty(String name) {
179 return getProperties().remove(name);
180 }
181
182 public Map<String, Object> getProperties() {
183 if (properties == null) {
184 properties = new ConcurrentHashMap<String, Object>();
185 }
186 return properties;
187 }
188
189 public void setProperties(Map<String, Object> properties) {
190 this.properties = properties;
191 }
192
193 public Message getIn() {
194 if (in == null) {
195 in = new DefaultMessage();
196 configureMessage(in);
197 }
198 return in;
199 }
200
201 public void setIn(Message in) {
202 this.in = in;
203 configureMessage(in);
204 }
205
206 public Message getOut() {
207 return getOut(true);
208 }
209
210 public boolean hasOut() {
211 return out != null;
212 }
213
214 public Message getOut(boolean lazyCreate) {
215 if (out == null && lazyCreate) {
216 out = (in != null && in instanceof MessageSupport)
217 ? ((MessageSupport)in).newInstance() : new DefaultMessage();
218 configureMessage(out);
219 }
220 return out;
221 }
222
223 public void setOut(Message out) {
224 this.out = out;
225 configureMessage(out);
226 }
227
228 public Exception getException() {
229 return exception;
230 }
231
232 public <T> T getException(Class<T> type) {
233 if (exception == null) {
234 return null;
235 }
236
237 Iterator<Throwable> it = ObjectHelper.createExceptionIterator(exception);
238 while (it.hasNext()) {
239 Throwable e = it.next();
240 if (type.isInstance(e)) {
241 return type.cast(e);
242 }
243 }
244 // not found
245 return null;
246 }
247
248 public void setException(Exception exception) {
249 this.exception = exception;
250 }
251
252 public ExchangePattern getPattern() {
253 return pattern;
254 }
255
256 public void setPattern(ExchangePattern pattern) {
257 this.pattern = pattern;
258 }
259
260 public Endpoint getFromEndpoint() {
261 return fromEndpoint;
262 }
263
264 public void setFromEndpoint(Endpoint fromEndpoint) {
265 this.fromEndpoint = fromEndpoint;
266 }
267
268 public boolean hasFault() {
269 return fault != null;
270 }
271
272 public Message getFault() {
273 return getFault(true);
274 }
275
276 public Message getFault(boolean lazyCreate) {
277 if (fault == null && lazyCreate) {
278 fault = (in != null && in instanceof MessageSupport)
279 ? ((MessageSupport)in).newInstance() : new DefaultMessage();
280 configureMessage(fault);
281 }
282 return fault;
283 }
284
285 public void setFault(Message fault) {
286 this.fault = fault;
287 configureMessage(fault);
288 }
289
290 public String getExchangeId() {
291 if (exchangeId == null) {
292 exchangeId = createExchangeId();
293 }
294 return exchangeId;
295 }
296
297 public void setExchangeId(String id) {
298 this.exchangeId = id;
299 }
300
301 public boolean isFailed() {
302 if (hasFault()) {
303 Object faultBody = getFault().getBody();
304 if (faultBody != null) {
305 return true;
306 }
307 }
308 return getException() != null;
309 }
310
311 public boolean isTransacted() {
312 Boolean transacted = getProperty(TRANSACTED, Boolean.class);
313 return transacted != null && transacted;
314 }
315
316 public boolean isRollbackOnly() {
317 Boolean rollback = getProperty(ROLLBACK_ONLY, Boolean.class);
318 return rollback != null && rollback;
319 }
320
321 public UnitOfWork getUnitOfWork() {
322 return unitOfWork;
323 }
324
325 public void setUnitOfWork(UnitOfWork unitOfWork) {
326 this.unitOfWork = unitOfWork;
327 if (this.onCompletions != null) {
328 // now an unit of work has been assigned so add the on completions
329 // we might have registered already
330 for (Synchronization onCompletion : this.onCompletions) {
331 this.unitOfWork.addSynchronization(onCompletion);
332 }
333 // cleanup the temporary on completion list as they now have been registered
334 // on the unit of work
335 this.onCompletions.clear();
336 this.onCompletions = null;
337 }
338 }
339
340 public void addOnCompletion(Synchronization onCompletion) {
341 if (this.unitOfWork == null) {
342 // unit of work not yet registered so we store the on completion temporary
343 // until the unit of work is assigned to this exchange by the UnitOfWorkProcessor
344 if (this.onCompletions == null) {
345 this.onCompletions = new ArrayList<Synchronization>();
346 }
347 this.onCompletions.add(onCompletion);
348 } else {
349 this.getUnitOfWork().addSynchronization(onCompletion);
350 }
351 }
352
353 /**
354 * Configures the message after it has been set on the exchange
355 */
356 protected void configureMessage(Message message) {
357 if (message instanceof MessageSupport) {
358 MessageSupport messageSupport = (MessageSupport)message;
359 messageSupport.setExchange(this);
360 }
361 }
362
363 protected String createExchangeId() {
364 String answer = null;
365 if (in != null) {
366 answer = in.createExchangeId();
367 }
368 if (answer == null) {
369 answer = DefaultExchange.DEFAULT_ID_GENERATOR.generateId();
370 }
371 return answer;
372 }
373 }