001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.util.ArrayList;
020 import java.util.Collections;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024 import java.util.concurrent.atomic.AtomicInteger;
025
026 import org.apache.camel.Exchange;
027 import org.apache.camel.Message;
028 import org.apache.camel.RouteNode;
029 import org.apache.camel.Service;
030 import org.apache.camel.model.ProcessorDefinition;
031 import org.apache.camel.spi.Synchronization;
032 import org.apache.camel.spi.TraceableUnitOfWork;
033 import org.apache.camel.util.UuidGenerator;
034 import org.apache.commons.logging.Log;
035 import org.apache.commons.logging.LogFactory;
036
037 /**
038 * The default implementation of {@link org.apache.camel.spi.UnitOfWork}
039 *
040 * @version $Revision: 793288 $
041 */
042 public class DefaultUnitOfWork implements TraceableUnitOfWork, Service {
043 private static final transient Log LOG = LogFactory.getLog(DefaultUnitOfWork.class);
044 private static final UuidGenerator DEFAULT_ID_GENERATOR = new UuidGenerator();
045
046 private String id;
047 private List<Synchronization> synchronizations;
048 private List<RouteNode> routeNodes;
049 private Map<ProcessorDefinition, AtomicInteger> routeIndex = new HashMap<ProcessorDefinition, AtomicInteger>();
050 private Message originalInMessage;
051
052 public DefaultUnitOfWork(Exchange exchange) {
053 // TODO: optimize to only copy original message if enabled to do so in the route
054
055 // special for JmsMessage as it can cause it to loose headers later. Yeah JMS suchs
056 if (exchange.getIn().getClass().getSimpleName().equals("JmsMessage")) {
057 this.originalInMessage = new DefaultMessage();
058 this.originalInMessage.setBody(exchange.getIn().getBody());
059 // cannot copy headers with a JmsMessage as the underlying javax.jms.Message object goes nuts
060 } else {
061 this.originalInMessage = exchange.getIn().copy();
062 }
063 }
064
065 public void start() throws Exception {
066 id = null;
067 }
068
069 public void stop() throws Exception {
070 // need to clean up when we are stopping to not leak memory
071 if (synchronizations != null) {
072 synchronizations.clear();
073 }
074 if (routeNodes != null) {
075 routeNodes.clear();
076 }
077 routeIndex.clear();
078 originalInMessage = null;
079 }
080
081 public synchronized void addSynchronization(Synchronization synchronization) {
082 if (synchronizations == null) {
083 synchronizations = new ArrayList<Synchronization>();
084 }
085 synchronizations.add(synchronization);
086 }
087
088 public synchronized void removeSynchronization(Synchronization synchronization) {
089 if (synchronizations != null) {
090 synchronizations.remove(synchronization);
091 }
092 }
093
094 public void handoverSynchronization(Exchange target) {
095 if (synchronizations == null || synchronizations.isEmpty()) {
096 return;
097 }
098
099 for (Synchronization synchronization : synchronizations) {
100 target.addOnCompletion(synchronization);
101 }
102
103 // clear this list as its handed over to the other exchange
104 this.synchronizations.clear();
105 }
106
107 public void done(Exchange exchange) {
108 if (synchronizations != null && !synchronizations.isEmpty()) {
109 boolean failed = exchange.isFailed();
110 for (Synchronization synchronization : synchronizations) {
111 try {
112 if (failed) {
113 synchronization.onFailure(exchange);
114 } else {
115 synchronization.onComplete(exchange);
116 }
117 } catch (Exception e) {
118 // must catch exceptions to ensure all synchronizations have a chance to run
119 LOG.error("Exception occured during onCompletion. This exception will be ignored: ", e);
120 }
121 }
122 }
123 }
124
125 public String getId() {
126 if (id == null) {
127 id = DEFAULT_ID_GENERATOR.generateId();
128 }
129 return id;
130 }
131
132 public void addTraced(RouteNode entry) {
133 if (routeNodes == null) {
134 routeNodes = new ArrayList<RouteNode>();
135 }
136 routeNodes.add(entry);
137 }
138
139 public RouteNode getLastNode() {
140 if (routeNodes == null || routeNodes.isEmpty()) {
141 return null;
142 }
143 return routeNodes.get(routeNodes.size() - 1);
144 }
145
146 public RouteNode getSecondLastNode() {
147 if (routeNodes == null || routeNodes.isEmpty() || routeNodes.size() == 1) {
148 return null;
149 }
150 return routeNodes.get(routeNodes.size() - 2);
151 }
152
153 public List<RouteNode> getNodes() {
154 return Collections.unmodifiableList(routeNodes);
155 }
156
157 public Message getOriginalInMessage() {
158 return originalInMessage;
159 }
160
161 public int getAndIncrement(ProcessorDefinition node) {
162 AtomicInteger count = routeIndex.get(node);
163 if (count == null) {
164 count = new AtomicInteger();
165 routeIndex.put(node, count);
166 }
167 return count.getAndIncrement();
168 }
169
170 }