001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.concurrent.RejectedExecutionException;
020
021 import org.apache.camel.Exchange;
022 import org.apache.camel.LoggingLevel;
023 import org.apache.camel.Message;
024 import org.apache.camel.Predicate;
025 import org.apache.camel.Processor;
026 import org.apache.camel.model.OnExceptionDefinition;
027 import org.apache.camel.util.ExchangeHelper;
028 import org.apache.camel.util.MessageHelper;
029 import org.apache.camel.util.ServiceHelper;
030
031 /**
032 * Base redeliverable error handler that also suppors a final dead letter queue in case
033 * all redelivery attempts fail.
034 * <p/>
035 * This implementation should contain all the error handling logic and the sub classes
036 * should only configure it according to what they support.
037 *
038 * @version $Revision: 792966 $
039 */
040 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements Processor {
041
042 protected final Processor deadLetter;
043 protected final String deadLetterUri;
044 protected final Processor output;
045 protected final Processor redeliveryProcessor;
046 protected final RedeliveryPolicy redeliveryPolicy;
047 protected final Predicate handledPolicy;
048 protected final Logger logger;
049 protected final boolean useOriginalMessagePolicy;
050
051 protected class RedeliveryData {
052 int redeliveryCounter;
053 long redeliveryDelay;
054 Predicate retryUntilPredicate;
055
056 // default behavior which can be overloaded on a per exception basis
057 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
058 Processor deadLetterProcessor = deadLetter;
059 Processor failureProcessor;
060 Processor onRedeliveryProcessor = redeliveryProcessor;
061 Predicate handledPredicate = handledPolicy;
062 boolean useOriginalInMessage = useOriginalMessagePolicy;
063 }
064
065 public RedeliveryErrorHandler(Processor output, Logger logger, Processor redeliveryProcessor,
066 RedeliveryPolicy redeliveryPolicy, Predicate handledPolicy, Processor deadLetter,
067 String deadLetterUri, boolean useOriginalMessagePolicy) {
068 this.redeliveryProcessor = redeliveryProcessor;
069 this.deadLetter = deadLetter;
070 this.output = output;
071 this.redeliveryPolicy = redeliveryPolicy;
072 this.logger = logger;
073 this.deadLetterUri = deadLetterUri;
074 this.handledPolicy = handledPolicy;
075 this.useOriginalMessagePolicy = useOriginalMessagePolicy;
076 }
077
078 public boolean supportTransacted() {
079 return false;
080 }
081
082 public void process(Exchange exchange) throws Exception {
083 if (output == null) {
084 // no output then just return
085 return;
086 }
087
088 processErrorHandler(exchange, new RedeliveryData());
089 }
090
091 /**
092 * Processes the exchange decorated with this dead letter channel.
093 */
094 protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) throws Exception {
095 while (true) {
096 // we can't keep retrying if the route is being shutdown.
097 if (!isRunAllowed()) {
098 if (log.isDebugEnabled()) {
099 log.debug("Rejected execution as we are not started for exchange: " + exchange);
100 }
101 if (exchange.getException() == null) {
102 exchange.setException(new RejectedExecutionException());
103 return;
104 }
105 }
106
107 // do not handle transacted exchanges that failed as this error handler does not support it
108 if (exchange.isTransacted() && !supportTransacted() && exchange.getException() != null) {
109 if (log.isTraceEnabled()) {
110 log.trace("This error handler does not support transacted exchanges."
111 + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
112 }
113 return;
114 }
115
116 // did previous processing cause an exception?
117 boolean handle = shouldHandleException(exchange);
118 if (handle) {
119 handleException(exchange, data);
120 }
121
122 // compute if we should redeliver or not
123 boolean shouldRedeliver = shouldRedeliver(exchange, data);
124 if (!shouldRedeliver) {
125 // no we should not redeliver to the same output so either try an onException (if any given)
126 // or the dead letter queue
127 Processor target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
128 // deliver to the failure processor (either an on exception or dead letter queue
129 deliverToFailureProcessor(target, exchange, data);
130 // prepare the exchange for failure before returning
131 prepareExchangeAfterFailure(exchange, data);
132 // and then return
133 return;
134 }
135
136 // if we are redelivering then sleep before trying again
137 if (shouldRedeliver && data.redeliveryCounter > 0) {
138 prepareExchangeForRedelivery(exchange);
139
140 // wait until we should redeliver
141 try {
142 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter);
143 } catch (InterruptedException e) {
144 log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
145 // continue from top
146 continue;
147 }
148
149 // letting onRedeliver be executed
150 deliverToRedeliveryProcessor(exchange, data);
151 }
152
153 // process the exchange (also redelivery)
154 try {
155 processExchange(exchange);
156 } catch (Exception e) {
157 exchange.setException(e);
158 }
159
160 boolean done = isDone(exchange);
161 if (done) {
162 return;
163 }
164 // error occurred so loop back around.....
165 }
166
167 }
168
169 /**
170 * Strategy whether the exchange has an exception that we should try to handle.
171 * <p/>
172 * Standard implementations should just look for an exception.
173 */
174 protected boolean shouldHandleException(Exchange exchange) {
175 return exchange.getException() != null;
176 }
177
178 /**
179 * Strategy to process the given exchange to the destinated output.
180 * <p/>
181 * This happens when the exchange is processed the first time and also for redeliveries
182 * to the same destination.
183 */
184 protected void processExchange(Exchange exchange) throws Exception {
185 // process the exchange (also redelivery)
186 output.process(exchange);
187 }
188
189 /**
190 * Strategy to determine if the exchange is done so we can continue
191 */
192 protected boolean isDone(Exchange exchange) throws Exception {
193 // only done if the exchange hasn't failed
194 // and it has not been handled by the failure processor
195 return exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange);
196 }
197
198 /**
199 * Returns the output processor
200 */
201 public Processor getOutput() {
202 return output;
203 }
204
205 /**
206 * Returns the dead letter that message exchanges will be sent to if the
207 * redelivery attempts fail
208 */
209 public Processor getDeadLetter() {
210 return deadLetter;
211 }
212
213 public RedeliveryPolicy getRedeliveryPolicy() {
214 return redeliveryPolicy;
215 }
216
217 public Logger getLogger() {
218 return logger;
219 }
220
221 protected void prepareExchangeForRedelivery(Exchange exchange) {
222 // okay we will give it another go so clear the exception so we can try again
223 if (exchange.getException() != null) {
224 exchange.setException(null);
225 }
226
227 // clear rollback flags
228 exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
229
230 // reset cached streams so they can be read again
231 MessageHelper.resetStreamCache(exchange.getIn());
232 }
233
234 protected void handleException(Exchange exchange, RedeliveryData data) {
235 Throwable e = exchange.getException();
236
237 // store the original caused exception in a property, so we can restore it later
238 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
239
240 // find the error handler to use (if any)
241 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
242 if (exceptionPolicy != null) {
243 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
244 data.handledPredicate = exceptionPolicy.getHandledPolicy();
245 data.retryUntilPredicate = exceptionPolicy.getRetryUntilPolicy();
246 data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy();
247
248 // route specific failure handler?
249 Processor processor = exceptionPolicy.getErrorHandler();
250 if (processor != null) {
251 data.failureProcessor = processor;
252 }
253 // route specific on redelivey?
254 processor = exceptionPolicy.getOnRedelivery();
255 if (processor != null) {
256 data.onRedeliveryProcessor = processor;
257 }
258 }
259
260 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
261 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
262 logFailedDelivery(true, exchange, msg, data, e);
263
264 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
265 }
266
267 /**
268 * Gives an optional configure redelivery processor a chance to process before the Exchange
269 * will be redelivered. This can be used to alter the Exchange.
270 */
271 protected void deliverToRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
272 if (data.onRedeliveryProcessor == null) {
273 return;
274 }
275
276 if (log.isTraceEnabled()) {
277 log.trace("Redelivery processor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange
278 + " before its redelivered");
279 }
280
281 try {
282 data.onRedeliveryProcessor.process(exchange);
283 } catch (Exception e) {
284 exchange.setException(e);
285 }
286 log.trace("Redelivery processor done");
287 }
288
289 /**
290 * All redelivery attempts failed so move the exchange to the dead letter queue
291 */
292 protected void deliverToFailureProcessor(final Processor processor, final Exchange exchange,
293 final RedeliveryData data) {
294 // we did not success with the redelivery so now we let the failure processor handle it
295 // clear exception as we let the failure processor handle it
296 exchange.setException(null);
297
298 if (data.handledPredicate != null && data.handledPredicate.matches(exchange)) {
299 // its handled then remove traces of redelivery attempted
300 exchange.getIn().removeHeader(Exchange.REDELIVERED);
301 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
302 } else {
303 // must decrement the redelivery counter as we didn't process the redelivery but is
304 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
305 decrementRedeliveryCounter(exchange);
306 }
307
308 // reset cached streams so they can be read again
309 MessageHelper.resetStreamCache(exchange.getIn());
310
311 if (processor != null) {
312 // prepare original IN body if it should be moved instead of current body
313 if (data.useOriginalInMessage) {
314 if (log.isTraceEnabled()) {
315 log.trace("Using the original IN message instead of current");
316 }
317
318 Message original = exchange.getUnitOfWork().getOriginalInMessage();
319 exchange.setIn(original);
320 }
321
322 if (log.isTraceEnabled()) {
323 log.trace("Failure processor " + processor + " is processing Exchange: " + exchange);
324 }
325 try {
326 processor.process(exchange);
327 } catch (Exception e) {
328 exchange.setException(e);
329 }
330 log.trace("Failure processor done");
331
332 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
333 + ". Processed by failure processor: " + processor;
334 logFailedDelivery(false, exchange, msg, data, null);
335 }
336 }
337
338 protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data) {
339 // we could not process the exchange so we let the failure processor handled it
340 ExchangeHelper.setFailureHandled(exchange);
341
342 // honor if already set a handling
343 boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
344 if (alreadySet) {
345 boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
346 if (log.isDebugEnabled()) {
347 log.debug("This exchange has already been marked for handling: " + handled);
348 }
349 if (handled) {
350 exchange.setException(null);
351 } else {
352 // exception not handled, put exception back in the exchange
353 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
354 }
355 return;
356 }
357
358 Predicate handledPredicate = data.handledPredicate;
359 if (handledPredicate == null || !handledPredicate.matches(exchange)) {
360 if (log.isDebugEnabled()) {
361 log.debug("This exchange is not handled so its marked as failed: " + exchange);
362 }
363 // exception not handled, put exception back in the exchange
364 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
365 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
366 } else {
367 if (log.isDebugEnabled()) {
368 log.debug("This exchange is handled so its marked as not failed: " + exchange);
369 }
370 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
371 }
372 }
373
374 private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) {
375 if (logger == null) {
376 return;
377 }
378
379 LoggingLevel newLogLevel;
380 if (shouldRedeliver) {
381 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
382 } else {
383 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
384 }
385 if (exchange.isRollbackOnly()) {
386 String msg = "Rollback exchange";
387 if (exchange.getException() != null) {
388 msg = msg + " due: " + exchange.getException().getMessage();
389 }
390 if (newLogLevel == LoggingLevel.ERROR || newLogLevel == LoggingLevel.FATAL) {
391 // log intented rollback on maximum WARN level (no ERROR or FATAL)
392 logger.log(msg, LoggingLevel.WARN);
393 } else {
394 // otherwise use the desired logging level
395 logger.log(msg, newLogLevel);
396 }
397 } else if (data.currentRedeliveryPolicy.isLogStackTrace() && e != null) {
398 logger.log(message, e, newLogLevel);
399 } else {
400 logger.log(message, newLogLevel);
401 }
402 }
403
404 private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
405 // if marked as rollback only then do not redeliver
406 Boolean rollback = exchange.getProperty(Exchange.ROLLBACK_ONLY, Boolean.class);
407 if (rollback != null && rollback) {
408 if (log.isTraceEnabled()) {
409 log.trace("This exchange is marked as rollback only, should not be redelivered: " + exchange);
410 }
411 return false;
412 }
413 return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryUntilPredicate);
414 }
415
416 /**
417 * Increments the redelivery counter and adds the redelivered flag if the
418 * message has been redelivered
419 */
420 private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
421 Message in = exchange.getIn();
422 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
423 int next = 1;
424 if (counter != null) {
425 next = counter + 1;
426 }
427 in.setHeader(Exchange.REDELIVERY_COUNTER, next);
428 in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
429 return next;
430 }
431
432 /**
433 * Prepares the redelivery counter and boolean flag for the failure handle processor
434 */
435 private void decrementRedeliveryCounter(Exchange exchange) {
436 Message in = exchange.getIn();
437 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
438 if (counter != null) {
439 int prev = counter - 1;
440 in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
441 // set boolean flag according to counter
442 in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
443 } else {
444 // not redelivered
445 in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
446 in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
447 }
448 }
449
450 @Override
451 protected void doStart() throws Exception {
452 ServiceHelper.startServices(output, deadLetter);
453 }
454
455 @Override
456 protected void doStop() throws Exception {
457 ServiceHelper.stopServices(deadLetter, output);
458 }
459
460 }