001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.concurrent.RejectedExecutionException;
020
021 import org.apache.camel.AsyncCallback;
022 import org.apache.camel.AsyncProcessor;
023 import org.apache.camel.Exchange;
024 import org.apache.camel.ExchangeProperty;
025 import org.apache.camel.Message;
026 import org.apache.camel.Processor;
027 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
028 import org.apache.camel.model.ExceptionType;
029 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
030 import org.apache.camel.util.AsyncProcessorHelper;
031 import org.apache.camel.util.ServiceHelper;
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034
035 /**
036 * Implements a <a
037 * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter
038 * Channel</a> after attempting to redeliver the message using the
039 * {@link RedeliveryPolicy}
040 *
041 * @version $Revision: 674036 $
042 */
043 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
044 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
045 public static final String REDELIVERED = "org.apache.camel.Redelivered";
046 public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException";
047
048 private class RedeliveryData {
049 int redeliveryCounter;
050 long redeliveryDelay;
051 boolean sync = true;
052
053 // default behaviour which can be overloaded on a per exception basis
054 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
055 Processor failureProcessor = deadLetter;
056 }
057
058 private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
059 private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
060 private Processor output;
061 private Processor deadLetter;
062 private AsyncProcessor outputAsync;
063 private RedeliveryPolicy redeliveryPolicy;
064 private Logger logger;
065
066 public DeadLetterChannel(Processor output, Processor deadLetter) {
067 this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(),
068 ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
069 }
070
071 public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) {
072 this.deadLetter = deadLetter;
073 this.output = output;
074 this.outputAsync = AsyncProcessorTypeConverter.convert(output);
075
076 this.redeliveryPolicy = redeliveryPolicy;
077 this.logger = logger;
078 setExceptionPolicy(exceptionPolicyStrategy);
079 }
080
081 public static <E extends Exchange> Logger createDefaultLogger() {
082 return new Logger(LOG, LoggingLevel.ERROR);
083 }
084
085 @Override
086 public String toString() {
087 return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
088 }
089
090 public boolean process(Exchange exchange, final AsyncCallback callback) {
091 return process(exchange, callback, new RedeliveryData());
092 }
093
094 public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
095
096 while (true) {
097 // We can't keep retrying if the route is being shutdown.
098 if (!isRunAllowed()) {
099 if (exchange.getException() == null) {
100 exchange.setException(new RejectedExecutionException());
101 }
102 callback.done(data.sync);
103 return data.sync;
104 }
105
106 // if the exchange is transacted then let the underlysing system handle the redelivery etc.
107 // this DeadLetterChannel is only for non transacted exchanges
108 if (exchange.isTransacted() && exchange.getException() != null) {
109 if (LOG.isDebugEnabled()) {
110 LOG.debug("Transacted Exchange, this DeadLetterChannel is bypassed: " + exchange);
111 }
112 return data.sync;
113 }
114
115 if (exchange.getException() != null) {
116 Throwable e = exchange.getException();
117 exchange.setException(null); // Reset it since we are handling it.
118
119 logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, e);
120 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
121
122 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
123 if (exceptionPolicy != null) {
124 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy);
125 Processor processor = exceptionPolicy.getErrorHandler();
126 if (processor != null) {
127 data.failureProcessor = processor;
128 }
129 }
130 }
131
132 if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
133 setFailureHandled(exchange, true);
134 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
135 boolean sync = afp.process(exchange, new AsyncCallback() {
136 public void done(boolean sync) {
137 restoreExceptionOnExchange(exchange);
138 callback.done(data.sync);
139 }
140 });
141
142 restoreExceptionOnExchange(exchange);
143 logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor);
144 return sync;
145 }
146
147 if (data.redeliveryCounter > 0) {
148 // Figure out how long we should wait to resend this message.
149 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
150 }
151
152 exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, exchange.getException());
153 exchange.setException(null);
154
155 boolean sync = outputAsync.process(exchange, new AsyncCallback() {
156 public void done(boolean sync) {
157 // Only handle the async case...
158 if (sync) {
159 return;
160 }
161 data.sync = false;
162 if (exchange.getException() != null) {
163 process(exchange, callback, data);
164 } else {
165 callback.done(sync);
166 }
167 }
168 });
169 if (!sync) {
170 // It is going to be processed async..
171 return false;
172 }
173 if (exchange.getException() == null || isFailureHandled(exchange)) {
174 // If everything went well.. then we exit here..
175 callback.done(true);
176 return true;
177 }
178 // error occurred so loop back around.....
179 }
180
181 }
182
183 public static boolean isFailureHandled(Exchange exchange) {
184 return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null;
185 }
186
187 public static void setFailureHandled(Exchange exchange, boolean isHandled) {
188 if (isHandled) {
189 exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
190 exchange.setException(null);
191 } else {
192 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
193 exchange.removeProperty(FAILURE_HANDLED_PROPERTY);
194 }
195
196 }
197
198 public static void restoreExceptionOnExchange(Exchange exchange) {
199 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
200 }
201
202 public void process(Exchange exchange) throws Exception {
203 AsyncProcessorHelper.process(this, exchange);
204 }
205
206 // Properties
207 // -------------------------------------------------------------------------
208
209 /**
210 * Returns the output processor
211 */
212 public Processor getOutput() {
213 return output;
214 }
215
216 /**
217 * Returns the dead letter that message exchanges will be sent to if the
218 * redelivery attempts fail
219 */
220 public Processor getDeadLetter() {
221 return deadLetter;
222 }
223
224 public RedeliveryPolicy getRedeliveryPolicy() {
225 return redeliveryPolicy;
226 }
227
228 /**
229 * Sets the redelivery policy
230 */
231 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
232 this.redeliveryPolicy = redeliveryPolicy;
233 }
234
235 public Logger getLogger() {
236 return logger;
237 }
238
239 /**
240 * Sets the logger strategy; which {@link Log} to use and which
241 * {@link LoggingLevel} to use
242 */
243 public void setLogger(Logger logger) {
244 this.logger = logger;
245 }
246
247 // Implementation methods
248 // -------------------------------------------------------------------------
249
250 /**
251 * Increments the redelivery counter and adds the redelivered flag if the
252 * message has been redelivered
253 */
254 protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
255 Message in = exchange.getIn();
256 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
257 int next = 1;
258 if (counter != null) {
259 next = counter + 1;
260 }
261 in.setHeader(REDELIVERY_COUNTER, next);
262 in.setHeader(REDELIVERED, Boolean.TRUE);
263 exchange.setException(e);
264 return next;
265 }
266
267 @Override
268 protected void doStart() throws Exception {
269 ServiceHelper.startServices(output, deadLetter);
270 }
271
272 @Override
273 protected void doStop() throws Exception {
274 ServiceHelper.stopServices(deadLetter, output);
275 }
276
277 }