001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.spring.spi;
018
019 import org.apache.camel.Exchange;
020 import org.apache.camel.ExchangeProperty;
021 import org.apache.camel.Processor;
022 import org.apache.camel.RuntimeCamelException;
023 import org.apache.camel.processor.DelegateProcessor;
024 import org.apache.camel.processor.RedeliveryPolicy;
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.springframework.transaction.TransactionDefinition;
028 import org.springframework.transaction.TransactionStatus;
029 import org.springframework.transaction.support.DefaultTransactionStatus;
030 import org.springframework.transaction.support.TransactionCallbackWithoutResult;
031 import org.springframework.transaction.support.TransactionSynchronizationManager;
032 import org.springframework.transaction.support.TransactionTemplate;
033
034 /**
035 * The <a href="http://activemq.apache.org/camel/transactional-client.html">Transactional Client</a>
036 * EIP pattern.
037 *
038 * @version $Revision: 674383 $
039 */
040 public class TransactionInterceptor extends DelegateProcessor {
041 public static final ExchangeProperty<Boolean> TRANSACTED =
042 new ExchangeProperty<Boolean>("transacted", "org.apache.camel.transacted", Boolean.class);
043 private static final transient Log LOG = LogFactory.getLog(TransactionInterceptor.class);
044 private final TransactionTemplate transactionTemplate;
045 private ThreadLocal<RedeliveryData> previousRollback = new ThreadLocal<RedeliveryData>() {
046 @Override
047 protected RedeliveryData initialValue() {
048 return new RedeliveryData();
049 }
050 };
051 private RedeliveryPolicy redeliveryPolicy;
052
053 public TransactionInterceptor(TransactionTemplate transactionTemplate) {
054 this.transactionTemplate = transactionTemplate;
055 }
056
057 public TransactionInterceptor(Processor processor, TransactionTemplate transactionTemplate) {
058 super(processor);
059 this.transactionTemplate = transactionTemplate;
060 }
061
062 public TransactionInterceptor(Processor processor, TransactionTemplate transactionTemplate, RedeliveryPolicy redeliveryPolicy) {
063 this(processor, transactionTemplate);
064 this.redeliveryPolicy = redeliveryPolicy;
065 }
066
067 @Override
068 public String toString() {
069 return "TransactionInterceptor:"
070 + propagationBehaviorToString(transactionTemplate.getPropagationBehavior())
071 + "[" + getProcessor() + "]";
072 }
073
074 public void process(final Exchange exchange) {
075 LOG.debug("Transaction begin");
076
077 final RedeliveryData redeliveryData = previousRollback.get();
078
079 transactionTemplate.execute(new TransactionCallbackWithoutResult() {
080 protected void doInTransactionWithoutResult(TransactionStatus status) {
081 // TODO: The delay is in some cases never triggered - see CAMEL-663
082 if (redeliveryPolicy != null && redeliveryData.previousRollback) {
083 // lets delay
084 redeliveryData.redeliveryDelay = redeliveryPolicy.sleep(redeliveryData.redeliveryDelay);
085 }
086
087 // wrapper exception to throw if the exchange failed
088 // IMPORTANT: Must be a runtime exception to let Spring regard it as to do "rollback"
089 RuntimeCamelException rce = null;
090
091 boolean activeTx = false;
092 try {
093 // find out if there is an actual transaction alive, and thus we are in transacted mode
094 activeTx = TransactionSynchronizationManager.isActualTransactionActive();
095 if (!activeTx) {
096 activeTx = status.isNewTransaction() && !status.isCompleted();
097 if (!activeTx) {
098 if (DefaultTransactionStatus.class.isAssignableFrom(status.getClass())) {
099 DefaultTransactionStatus defStatus = DefaultTransactionStatus.class
100 .cast(status);
101 activeTx = defStatus.hasTransaction() && !status.isCompleted();
102 }
103 }
104 }
105 if (LOG.isDebugEnabled()) {
106 LOG.debug("Is actual transaction active: " + activeTx);
107 }
108
109 // okay mark the exchange as transacted, then the DeadLetterChannel or others know
110 // its an transacted exchange
111 if (activeTx) {
112 TRANSACTED.set(exchange, Boolean.TRUE);
113 }
114
115 // process the exchange
116 processNext(exchange);
117
118 // wrap if the exchange failed with an exception
119 if (exchange.getException() != null) {
120 rce = new RuntimeCamelException(exchange.getException());
121 }
122 } catch (Exception e) {
123 // wrap if the exchange threw an exception
124 rce = new RuntimeCamelException(e);
125 }
126
127 // rehrow exception if the exchange failed
128 if (rce != null) {
129 redeliveryData.previousRollback = true;
130 if (activeTx) {
131 status.setRollbackOnly();
132 LOG.debug("Transaction rollback");
133 }
134 throw rce;
135 }
136 }
137 });
138
139 redeliveryData.previousRollback = false;
140 redeliveryData.redeliveryDelay = 0L;
141
142 LOG.debug("Transaction commit");
143 }
144
145
146 public RedeliveryPolicy getRedeliveryPolicy() {
147 return redeliveryPolicy;
148 }
149
150 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
151 this.redeliveryPolicy = redeliveryPolicy;
152 }
153
154 protected static class RedeliveryData {
155 boolean previousRollback;
156 long redeliveryDelay;
157 }
158
159 protected String propagationBehaviorToString(int propagationBehavior) {
160 String rc;
161 switch (propagationBehavior) {
162 case TransactionDefinition.PROPAGATION_MANDATORY:
163 rc = "PROPAGATION_MANDATORY";
164 break;
165 case TransactionDefinition.PROPAGATION_NESTED:
166 rc = "PROPAGATION_NESTED";
167 break;
168 case TransactionDefinition.PROPAGATION_NEVER:
169 rc = "PROPAGATION_NEVER";
170 break;
171 case TransactionDefinition.PROPAGATION_NOT_SUPPORTED:
172 rc = "PROPAGATION_NOT_SUPPORTED";
173 break;
174 case TransactionDefinition.PROPAGATION_REQUIRED:
175 rc = "PROPAGATION_REQUIRED";
176 break;
177 case TransactionDefinition.PROPAGATION_REQUIRES_NEW:
178 rc = "PROPAGATION_REQUIRES_NEW";
179 break;
180 case TransactionDefinition.PROPAGATION_SUPPORTS:
181 rc = "PROPAGATION_SUPPORTS";
182 break;
183 default:
184 rc = "UNKNOWN";
185 }
186 return rc;
187 }
188
189 }