001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.lang.reflect.Method;
020 import java.util.Map;
021 import javax.xml.bind.annotation.XmlAccessType;
022 import javax.xml.bind.annotation.XmlAccessorType;
023 import javax.xml.bind.annotation.XmlAttribute;
024 import javax.xml.bind.annotation.XmlRootElement;
025 import javax.xml.bind.annotation.XmlTransient;
026
027 import org.apache.camel.Processor;
028 import org.apache.camel.RuntimeCamelException;
029 import org.apache.camel.spi.Policy;
030 import org.apache.camel.spi.RouteContext;
031 import org.apache.camel.spi.TransactedPolicy;
032 import org.apache.camel.util.ObjectHelper;
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035
036 /**
037 * Represents an XML <transacted/> element
038 *
039 * @version $Revision: 791395 $
040 */
041 @XmlRootElement(name = "transacted")
042 @XmlAccessorType(XmlAccessType.FIELD)
043 public class TransactedDefinition extends OutputDefinition<ProcessorDefinition> {
044
045 // TODO: Align this code with PolicyDefinition
046
047 // JAXB does not support changing the ref attribute from required to optional
048 // if we extend PolicyDefinition so we must make a copy of the class
049 @XmlTransient
050 public static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED";
051
052 private static final transient Log LOG = LogFactory.getLog(TransactedDefinition.class);
053
054 @XmlTransient
055 protected Class<? extends Policy> type = TransactedPolicy.class;
056 @XmlAttribute
057 protected String ref;
058 @XmlTransient
059 private Policy policy;
060
061 public TransactedDefinition() {
062 }
063
064 public TransactedDefinition(Policy policy) {
065 this.policy = policy;
066 }
067
068 @Override
069 public String toString() {
070 return "Transacted[" + description() + "]";
071 }
072
073 @Override
074 public String getShortName() {
075 return "transacted";
076 }
077
078 @Override
079 public String getLabel() {
080 if (ref != null) {
081 return "ref:" + ref;
082 } else if (policy != null) {
083 return policy.toString();
084 } else {
085 return "";
086 }
087 }
088
089 public String getRef() {
090 return ref;
091 }
092
093 public void setRef(String ref) {
094 this.ref = ref;
095 }
096
097 /**
098 * Sets a policy type that this defition should scope within.
099 * <p/>
100 * Is used for convention over configuration situations where the policy
101 * should be automatic looked up in the registry and it should be based
102 * on this type. For instance a {@link org.apache.camel.spi.TransactedPolicy}
103 * can be set as type for easy transaction configuration.
104 * <p/>
105 * Will by default scope to the wide {@link Policy}
106 *
107 * @param type the policy type
108 */
109 public void setType(Class<? extends Policy> type) {
110 this.type = type;
111 }
112
113 /**
114 * Sets a reference to use for lookup the policy in the registry.
115 *
116 * @param ref the reference
117 * @return the builder
118 */
119 public TransactedDefinition ref(String ref) {
120 setRef(ref);
121 return this;
122 }
123
124 @Override
125 public Processor createProcessor(RouteContext routeContext) throws Exception {
126 Processor childProcessor = createOutputsProcessor(routeContext);
127
128 Policy policy = resolvePolicy(routeContext);
129 ObjectHelper.notNull(policy, "policy", this);
130 return policy.wrap(routeContext, childProcessor);
131 }
132
133
134 protected String description() {
135 if (policy != null) {
136 return policy.toString();
137 } else {
138 return "ref:" + ref;
139 }
140 }
141
142 protected Policy resolvePolicy(RouteContext routeContext) {
143 if (policy != null) {
144 return policy;
145 }
146 return doResolvePolicy(routeContext, getRef(), type);
147 }
148
149 @SuppressWarnings("unchecked")
150 protected static Policy doResolvePolicy(RouteContext routeContext, String ref, Class<? extends Policy> type) {
151 // explicit ref given so lookup by it
152 if (ObjectHelper.isNotEmpty(ref)) {
153 return routeContext.lookup(ref, Policy.class);
154 }
155
156 // no explicit reference given from user so we can use some convention over configuration here
157
158 // try to lookup by scoped type
159 Policy answer = null;
160 if (type != null) {
161 // try find by type, note that this method is not supported by all registry
162 Map types = routeContext.lookupByType(type);
163 if (types.size() == 1) {
164 // only one policy defined so use it
165 Object found = types.values().iterator().next();
166 if (type.isInstance(found)) {
167 return type.cast(found);
168 }
169 }
170 }
171
172 // for transacted routing try the default REQUIRED name
173 if (type == TransactedPolicy.class) {
174 // still not found try with the default name PROPAGATION_REQUIRED
175 answer = routeContext.lookup(PROPAGATION_REQUIRED, TransactedPolicy.class);
176 }
177
178 // still no policy found then try lookup the platform transaction manager and use it as policy
179 if (answer == null && type == TransactedPolicy.class) {
180 Class tmClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.springframework.transaction.PlatformTransactionManager");
181 if (tmClazz != null) {
182 // see if we can find the platform transaction manager in the registry
183 Map<String, Object> maps = routeContext.lookupByType(tmClazz);
184 if (maps.size() == 1) {
185 // only one platform manager then use it as default and create a transacted
186 // policy with it and default to required
187
188 // as we do not want dependency on spring jars in the camel-core we use
189 // reflection to lookup classes and create new objects and call methods
190 // as this is only done during route building it does not matter that we
191 // use reflection as performance is no a concern during route building
192 Object transactionManager = maps.values().iterator().next();
193 if (LOG.isDebugEnabled()) {
194 LOG.debug("One instance of PlatformTransactionManager found in registry: " + transactionManager);
195 }
196 Class txClazz = routeContext.getCamelContext().getClassResolver().resolveClass("org.apache.camel.spring.spi.SpringTransactionPolicy");
197 if (txClazz != null) {
198 if (LOG.isDebugEnabled()) {
199 LOG.debug("Creating a new temporary SpringTransactionPolicy using the PlatformTransactionManager: " + transactionManager);
200 }
201 TransactedPolicy txPolicy = ObjectHelper.newInstance(txClazz, TransactedPolicy.class);
202 Method method;
203 try {
204 method = txClazz.getMethod("setTransactionManager", tmClazz);
205 } catch (NoSuchMethodException e) {
206 throw new RuntimeCamelException("Cannot get method setTransactionManager(PlatformTransactionManager) on class: " + txClazz);
207 }
208 ObjectHelper.invokeMethod(method, txPolicy, transactionManager);
209 return txPolicy;
210 } else {
211 LOG.warn("Cannot create a transacted policy as camel-spring.jar is not on the classpath!");
212 }
213 } else {
214 if (LOG.isDebugEnabled()) {
215 if (maps.isEmpty()) {
216 LOG.debug("No PlatformTransactionManager found in registry.");
217 } else {
218 LOG.debug("Found " + maps.size() + " PlatformTransactionManager in registry. "
219 + "Cannot determine which one to use. Please configure a TransactionTemplate on the policy");
220 }
221 }
222 }
223 }
224 }
225
226 return answer;
227 }
228
229 }