001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.Timer;
020 import java.util.TimerTask;
021 import java.util.concurrent.RejectedExecutionException;
022
023 import org.apache.camel.AsyncCallback;
024 import org.apache.camel.AsyncProcessor;
025 import org.apache.camel.Exchange;
026 import org.apache.camel.Message;
027 import org.apache.camel.Predicate;
028 import org.apache.camel.Processor;
029 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
030 import org.apache.camel.model.ExceptionType;
031 import org.apache.camel.model.LoggingLevel;
032 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
033 import org.apache.camel.util.AsyncProcessorHelper;
034 import org.apache.camel.util.MessageHelper;
035 import org.apache.camel.util.ServiceHelper;
036 import org.apache.commons.logging.Log;
037 import org.apache.commons.logging.LogFactory;
038
039 /**
040 * Implements a <a
041 * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter
042 * Channel</a> after attempting to redeliver the message using the
043 * {@link RedeliveryPolicy}
044 *
045 * @version $Revision: 761191 $
046 */
047 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
048 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
049 public static final String REDELIVERED = "org.apache.camel.Redelivered";
050 public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException";
051 public static final String CAUGHT_EXCEPTION_HEADER = "org.apache.camel.CamelCaughtException";
052
053 private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
054 private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
055
056 private static Timer timer = new Timer("Camel DeadLetterChannel Redeliver Timer", true);
057 private Processor output;
058 private Processor deadLetter;
059 private AsyncProcessor outputAsync;
060 private RedeliveryPolicy redeliveryPolicy;
061 private Logger logger;
062 private Processor redeliveryProcessor;
063
064 private class RedeliveryData {
065 int redeliveryCounter;
066 long redeliveryDelay;
067 boolean sync = true;
068 Predicate handledPredicate;
069
070 // default behavior which can be overloaded on a per exception basis
071 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
072 Processor failureProcessor = deadLetter;
073 }
074
075 private class RedeliverTimerTask extends TimerTask {
076 private final Exchange exchange;
077 private final AsyncCallback callback;
078 private final RedeliveryData data;
079
080 public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
081 this.exchange = exchange;
082 this.callback = callback;
083 this.data = data;
084 }
085
086 @Override
087 public void run() {
088 //only handle the real AsyncProcess the exchange
089 outputAsync.process(exchange, new AsyncCallback() {
090 public void done(boolean sync) {
091 // Only handle the async case...
092 if (sync) {
093 return;
094 }
095 data.sync = false;
096 // only process if the exchange hasn't failed
097 // and it has not been handled by the error processor
098 if (exchange.getException() != null && !isFailureHandled(exchange)) {
099 // if we are redelivering then sleep before trying again
100 asyncProcess(exchange, callback, data);
101 } else {
102 callback.done(sync);
103 }
104 }
105 });
106 }
107 }
108
109 public DeadLetterChannel(Processor output, Processor deadLetter, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) {
110 this.output = output;
111 this.deadLetter = deadLetter;
112 this.redeliveryProcessor = redeliveryProcessor;
113 this.outputAsync = AsyncProcessorTypeConverter.convert(output);
114 this.redeliveryPolicy = redeliveryPolicy;
115 this.logger = logger;
116 setExceptionPolicy(exceptionPolicyStrategy);
117 }
118
119 public static <E extends Exchange> Logger createDefaultLogger() {
120 return new Logger(LOG, LoggingLevel.ERROR);
121 }
122
123 @Override
124 public String toString() {
125 return "DeadLetterChannel[" + output + ", " + deadLetter + "]";
126 }
127
128 public void process(Exchange exchange) throws Exception {
129 AsyncProcessorHelper.process(this, exchange);
130 }
131
132 public boolean process(Exchange exchange, final AsyncCallback callback) {
133 return process(exchange, callback, new RedeliveryData());
134 }
135
136 /**
137 * Processes the exchange using decorated with this dead letter channel.
138 */
139 protected boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
140
141 while (true) {
142 // we can't keep retrying if the route is being shutdown.
143 if (!isRunAllowed()) {
144 if (exchange.getException() == null) {
145 exchange.setException(new RejectedExecutionException());
146 }
147 callback.done(data.sync);
148 return data.sync;
149 }
150
151 // if the exchange is transacted then let the underlying system handle the redelivery etc.
152 // this DeadLetterChannel is only for non transacted exchanges
153 if (exchange.isTransacted() && exchange.getException() != null) {
154 if (LOG.isDebugEnabled()) {
155 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange);
156 }
157 return data.sync;
158 }
159
160 // did previous processing caused an exception?
161 if (exchange.getException() != null) {
162 handleException(exchange, data);
163 }
164
165 // compute if we should redeliver or not
166 boolean shouldRedeliver = shouldRedeliver(exchange, data);
167 if (!shouldRedeliver) {
168 return deliverToFaultProcessor(exchange, callback, data);
169 }
170
171 // if we are redelivering then sleep before trying again
172 if (data.redeliveryCounter > 0) {
173 // okay we will give it another go so clear the exception so we can try again
174 if (exchange.getException() != null) {
175 exchange.setException(null);
176 }
177
178 // reset cached streams so they can be read again
179 MessageHelper.resetStreamCache(exchange.getIn());
180
181 // wait until we should redeliver
182 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
183
184 // letting onRedeliver be executed
185 deliverToRedeliveryProcessor(exchange, callback, data);
186 }
187
188 // process the exchange
189 boolean sync = outputAsync.process(exchange, new AsyncCallback() {
190 public void done(boolean sync) {
191 // Only handle the async case...
192 if (sync) {
193 return;
194 }
195 data.sync = false;
196 // only process if the exchange hasn't failed
197 // and it has not been handled by the error processor
198 if (exchange.getException() != null && !isFailureHandled(exchange)) {
199 //TODO Call the Timer for the asyncProcessor
200 asyncProcess(exchange, callback, data);
201 } else {
202 callback.done(sync);
203 }
204 }
205 });
206 if (!sync) {
207 // It is going to be processed async..
208 return false;
209 }
210 if (exchange.getException() == null || isFailureHandled(exchange)) {
211 // If everything went well.. then we exit here..
212 callback.done(true);
213 return true;
214 }
215 // error occurred so loop back around.....
216 }
217
218 }
219
220 protected void asyncProcess(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
221 // set the timer here
222 if (!isRunAllowed()) {
223 if (exchange.getException() == null) {
224 exchange.setException(new RejectedExecutionException());
225 }
226 callback.done(data.sync);
227 return;
228 }
229
230 // if the exchange is transacted then let the underlying system handle the redelivery etc.
231 // this DeadLetterChannel is only for non transacted exchanges
232 if (exchange.isTransacted() && exchange.getException() != null) {
233 if (LOG.isDebugEnabled()) {
234 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange);
235 }
236 return;
237 }
238
239 // did previous processing caused an exception?
240 if (exchange.getException() != null) {
241 handleException(exchange, data);
242 }
243
244 // compute if we should redeliver or not
245 boolean shouldRedeliver = shouldRedeliver(exchange, data);
246 if (!shouldRedeliver) {
247 deliverToFaultProcessor(exchange, callback, data);
248 return;
249 }
250
251 // process the next try
252 // if we are redelivering then sleep before trying again
253 if (data.redeliveryCounter > 0) {
254 // okay we will give it another go so clear the exception so we can try again
255 if (exchange.getException() != null) {
256 exchange.setException(null);
257 }
258 // wait until we should redeliver
259 data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
260 timer.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
261
262 // letting onRedeliver be executed
263 deliverToRedeliveryProcessor(exchange, callback, data);
264 }
265 }
266
267 private void handleException(Exchange exchange, RedeliveryData data) {
268 Throwable e = exchange.getException();
269 // set the original caused exception
270 exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e);
271
272 // find the error handler to use (if any)
273 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
274 if (exceptionPolicy != null) {
275 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
276 data.handledPredicate = exceptionPolicy.getHandledPolicy();
277 Processor processor = exceptionPolicy.getErrorHandler();
278 if (processor != null) {
279 data.failureProcessor = processor;
280 }
281 }
282
283 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
284 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
285 logFailedDelivery(true, exchange, msg, data, e);
286
287 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
288 }
289
290 /**
291 * Gives an optional configure redelivery processor a chance to process before the Exchange
292 * will be redelivered. This can be used to alter the Exchange.
293 */
294 private void deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback,
295 final RedeliveryData data) {
296 if (redeliveryProcessor == null) {
297 return;
298 }
299
300 if (LOG.isTraceEnabled()) {
301 LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is processing Exchange: " + exchange + " before its redelivered");
302 }
303
304 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(redeliveryProcessor);
305 afp.process(exchange, new AsyncCallback() {
306 public void done(boolean sync) {
307 LOG.trace("Redelivery processor done");
308 // do NOT call done on callback as this is the redelivery processor that
309 // is done. we should not mark the entire exchange as done.
310 }
311 });
312 }
313
314 private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,
315 final RedeliveryData data) {
316 // we did not success with the redelivery so now we let the failure processor handle it
317 setFailureHandled(exchange);
318 // must decrement the redelivery counter as we didn't process the redelivery but is
319 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
320 decrementRedeliveryCounter(exchange);
321
322 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
323 boolean sync = afp.process(exchange, new AsyncCallback() {
324 public void done(boolean sync) {
325 restoreExceptionOnExchange(exchange, data.handledPredicate);
326 callback.done(data.sync);
327 }
328 });
329
330 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
331 + ". Handled by the failure processor: " + data.failureProcessor;
332 logFailedDelivery(false, exchange, msg, data, null);
333
334 return sync;
335 }
336
337 // Properties
338 // -------------------------------------------------------------------------
339
340 public static boolean isFailureHandled(Exchange exchange) {
341 return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
342 || exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null;
343 }
344
345 public static void setFailureHandled(Exchange exchange) {
346 exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException());
347 exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, exchange.getException());
348 exchange.setException(null);
349 }
350
351 /**
352 * Returns the output processor
353 */
354 public Processor getOutput() {
355 return output;
356 }
357
358 /**
359 * Returns the dead letter that message exchanges will be sent to if the
360 * redelivery attempts fail
361 */
362 public Processor getDeadLetter() {
363 return deadLetter;
364 }
365
366 public RedeliveryPolicy getRedeliveryPolicy() {
367 return redeliveryPolicy;
368 }
369
370 /**
371 * Sets the redelivery policy
372 */
373 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
374 this.redeliveryPolicy = redeliveryPolicy;
375 }
376
377 public Logger getLogger() {
378 return logger;
379 }
380
381 /**
382 * Sets the logger strategy; which {@link Log} to use and which
383 * {@link LoggingLevel} to use
384 */
385 public void setLogger(Logger logger) {
386 this.logger = logger;
387 }
388
389 // Implementation methods
390 // -------------------------------------------------------------------------
391
392 protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) {
393 if (handledPredicate == null || !handledPredicate.matches(exchange)) {
394 if (LOG.isDebugEnabled()) {
395 LOG.debug("This exchange is not handled so its marked as failed: " + exchange);
396 }
397 // exception not handled, put exception back in the exchange
398 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class));
399 } else {
400 if (LOG.isDebugEnabled()) {
401 LOG.debug("This exchange is handled so its marked as not failed: " + exchange);
402 }
403 exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE);
404 }
405 }
406
407 private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) {
408 LoggingLevel newLogLevel;
409 if (shouldRedeliver) {
410 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
411 } else {
412 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
413 }
414 if (e != null) {
415 logger.log(message, e, newLogLevel);
416 } else {
417 logger.log(message, newLogLevel);
418 }
419 }
420
421 private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
422 return data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter);
423 }
424
425 /**
426 * Increments the redelivery counter and adds the redelivered flag if the
427 * message has been redelivered
428 */
429 protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
430 Message in = exchange.getIn();
431 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
432 int next = 1;
433 if (counter != null) {
434 next = counter + 1;
435 }
436 in.setHeader(REDELIVERY_COUNTER, next);
437 in.setHeader(REDELIVERED, Boolean.TRUE);
438 return next;
439 }
440
441 /**
442 * Prepares the redelivery counter and boolean flag for the failure handle processor
443 */
444 private void decrementRedeliveryCounter(Exchange exchange) {
445 Message in = exchange.getIn();
446 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
447 if (counter != null) {
448 int prev = counter - 1;
449 in.setHeader(REDELIVERY_COUNTER, prev);
450 // set boolean flag according to counter
451 in.setHeader(REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
452 } else {
453 // not redelivered
454 in.setHeader(REDELIVERY_COUNTER, 0);
455 in.setHeader(REDELIVERED, Boolean.FALSE);
456 }
457 }
458
459 @Override
460 protected void doStart() throws Exception {
461 ServiceHelper.startServices(output, deadLetter);
462 }
463
464 @Override
465 protected void doStop() throws Exception {
466 ServiceHelper.stopServices(deadLetter, output);
467 }
468
469 }