001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.io.Serializable;
020 import java.util.Random;
021
022 import org.apache.camel.Exchange;
023 import org.apache.camel.LoggingLevel;
024 import org.apache.camel.Predicate;
025 import org.apache.camel.util.ObjectHelper;
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028
029 // Code taken from the ActiveMQ codebase
030
031 /**
032 * The policy used to decide how many times to redeliver and the time between
033 * the redeliveries before being sent to a <a
034 * href="http://camel.apache.org/dead-letter-channel.html">Dead Letter
035 * Channel</a>
036 * <p>
037 * The default values are:
038 * <ul>
039 * <li>maximumRedeliveries = 0</li>
040 * <li>redeliverDelay = 1000L (the initial delay)</li>
041 * <li>maximumRedeliveryDelay = 60 * 1000L</li>
042 * <li>backOffMultiplier = 2</li>
043 * <li>useExponentialBackOff = false</li>
044 * <li>collisionAvoidanceFactor = 0.15d</li>
045 * <li>useCollisionAvoidance = false</li>
046 * <li>retriesExhaustedLogLevel = LoggingLevel.DEBUG</li>
047 * <li>retryAttemptedLogLevel = LoggingLevel.DEBUG</li>
048 * <li>logStrackTrace = false</li>
049 * </ul>
050 * <p/>
051 * Setting the maximumRedeliveries to a negative value such as -1 will then always redeliver (unlimited).
052 * Setting the maximumRedeliveries to 0 will disable redelivery.
053 * <p/>
054 * This policy can be configured either by one of the following two settings:
055 * <ul>
056 * <li>using convnetional options, using all the options defined above</li>
057 * <li>using delay pattern to declare intervals for delays</li>
058 * </ul>
059 * <p/>
060 * <b>Note:</b> If using delay patterns then the following options is not used (delay, backOffMultiplier, useExponentialBackOff, useCollisionAvoidance)
061 * <p/>
062 * <b>Using delay pattern</b>:
063 * <br/>The delay pattern syntax is: <tt>limit:delay;limit 2:delay 2;limit 3:delay 3;...;limit N:delay N</tt>.
064 * <p/>
065 * How it works is best illustrate with an example with this pattern: <tt>delayPattern=5:1000;10:5000:20:20000</tt>
066 * <br/>The delays will be for attempt in range 0..4 = 0 millis, 5..9 = 1000 millis, 10..19 = 5000 millis, >= 20 = 20000 millis.
067 * <p/>
068 * If you want to set a starting delay, then use 0 as the first limit, eg: <tt>0:1000;5:5000</tt> will use 1 sec delay
069 * until attempt number 5 where it will use 5 seconds going forward.
070 *
071 * @version $Revision: 791760 $
072 */
073 public class RedeliveryPolicy implements Cloneable, Serializable {
074 protected static transient Random randomNumberGenerator;
075 private static final transient Log LOG = LogFactory.getLog(RedeliveryPolicy.class);
076
077 protected long redeliverDelay = 1000L;
078 protected int maximumRedeliveries;
079 protected long maximumRedeliveryDelay = 60 * 1000L;
080 protected double backOffMultiplier = 2;
081 protected boolean useExponentialBackOff;
082 // +/-15% for a 30% spread -cgs
083 protected double collisionAvoidanceFactor = 0.15d;
084 protected boolean useCollisionAvoidance;
085 protected LoggingLevel retriesExhaustedLogLevel = LoggingLevel.DEBUG;
086 protected LoggingLevel retryAttemptedLogLevel = LoggingLevel.DEBUG;
087 protected boolean logStackTrace;
088 protected String delayPattern;
089
090 public RedeliveryPolicy() {
091 }
092
093 @Override
094 public String toString() {
095 return "RedeliveryPolicy[maximumRedeliveries=" + maximumRedeliveries
096 + ", redeliverDelay=" + redeliverDelay
097 + ", maximumRedeliveryDelay=" + maximumRedeliveryDelay
098 + ", retriesExhaustedLogLevel=" + retriesExhaustedLogLevel
099 + ", retryAttemptedLogLevel=" + retryAttemptedLogLevel
100 + ", logTraceStace=" + logStackTrace
101 + ", useExponentialBackOff=" + useExponentialBackOff
102 + ", backOffMultiplier=" + backOffMultiplier
103 + ", useCollisionAvoidance=" + useCollisionAvoidance
104 + ", collisionAvoidanceFactor=" + collisionAvoidanceFactor
105 + ", delayPattern=" + delayPattern + "]";
106 }
107
108 public RedeliveryPolicy copy() {
109 try {
110 return (RedeliveryPolicy)clone();
111 } catch (CloneNotSupportedException e) {
112 throw new RuntimeException("Could not clone: " + e, e);
113 }
114 }
115
116 /**
117 * Returns true if the policy decides that the message exchange should be
118 * redelivered.
119 *
120 * @param exchange the current exchange
121 * @param redeliveryCounter the current retry counter
122 * @param retryUntil an optional predicate to determine if we should redeliver or not
123 * @return true to redeliver, false to stop
124 */
125 public boolean shouldRedeliver(Exchange exchange, int redeliveryCounter, Predicate retryUntil) {
126 // predicate is always used if provided
127 if (retryUntil != null) {
128 return retryUntil.matches(exchange);
129 }
130
131 if (getMaximumRedeliveries() < 0) {
132 // retry forever if negative value
133 return true;
134 }
135 // redeliver until we hit the max
136 return redeliveryCounter <= getMaximumRedeliveries();
137 }
138
139
140 /**
141 * Calculates the new redelivery delay based on the last one then sleeps for the necessary amount of time
142 *
143 * @param redeliveryDelay previous redelivery delay
144 * @param redeliveryCounter number of previous redelivery attempts
145 * @return the calculate delay
146 * @throws InterruptedException is thrown if the sleep is interruped likely because of shutdown
147 */
148 public long sleep(long redeliveryDelay, int redeliveryCounter) throws InterruptedException {
149 redeliveryDelay = calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
150
151 if (redeliveryDelay > 0) {
152 if (LOG.isDebugEnabled()) {
153 LOG.debug("Sleeping for: " + redeliveryDelay + " millis until attempting redelivery");
154 }
155 Thread.sleep(redeliveryDelay);
156 }
157 return redeliveryDelay;
158 }
159
160 protected long calculateRedeliveryDelay(long previousDelay, int redeliveryCounter) {
161 if (ObjectHelper.isNotEmpty(delayPattern)) {
162 // calculate delay using the pattern
163 return calculateRedeliverDelayUsingPattern(delayPattern, redeliveryCounter);
164 }
165
166 // calculate the delay using the conventional parameters
167 long redeliveryDelay;
168 if (previousDelay == 0) {
169 redeliveryDelay = redeliverDelay;
170 } else if (useExponentialBackOff && backOffMultiplier > 1) {
171 redeliveryDelay = Math.round(backOffMultiplier * previousDelay);
172 } else {
173 redeliveryDelay = previousDelay;
174 }
175
176 if (useCollisionAvoidance) {
177
178 /*
179 * First random determines +/-, second random determines how far to
180 * go in that direction. -cgs
181 */
182 Random random = getRandomNumberGenerator();
183 double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor)
184 * random.nextDouble();
185 redeliveryDelay += redeliveryDelay * variance;
186 }
187
188 if (maximumRedeliveryDelay > 0 && redeliveryDelay > maximumRedeliveryDelay) {
189 redeliveryDelay = maximumRedeliveryDelay;
190 }
191
192 return redeliveryDelay;
193 }
194
195 /**
196 * Calculates the delay using the delay pattern
197 */
198 protected static long calculateRedeliverDelayUsingPattern(String delayPattern, int redeliveryCounter) {
199 String[] groups = delayPattern.split(";");
200 // find the group where ther redelivery counter matches
201 long answer = 0;
202 for (String group : groups) {
203 long delay = Long.valueOf(ObjectHelper.after(group, ":"));
204 int count = Integer.valueOf(ObjectHelper.before(group, ":"));
205 if (count > redeliveryCounter) {
206 break;
207 } else {
208 answer = delay;
209 }
210 }
211
212 return answer;
213 }
214
215
216 // Builder methods
217 // -------------------------------------------------------------------------
218
219 /**
220 * Sets the delay in milliseconds
221 */
222 public RedeliveryPolicy redeliverDelay(long delay) {
223 setRedeliverDelay(delay);
224 return this;
225 }
226
227 /**
228 * Sets the maximum number of times a message exchange will be redelivered
229 */
230 public RedeliveryPolicy maximumRedeliveries(int maximumRedeliveries) {
231 setMaximumRedeliveries(maximumRedeliveries);
232 return this;
233 }
234
235 /**
236 * Enables collision avoidance which adds some randomization to the backoff
237 * timings to reduce contention probability
238 */
239 public RedeliveryPolicy useCollisionAvoidance() {
240 setUseCollisionAvoidance(true);
241 return this;
242 }
243
244 /**
245 * Enables exponential backoff using the {@link #getBackOffMultiplier()} to
246 * increase the time between retries
247 */
248 public RedeliveryPolicy useExponentialBackOff() {
249 setUseExponentialBackOff(true);
250 return this;
251 }
252
253 /**
254 * Enables exponential backoff and sets the multiplier used to increase the
255 * delay between redeliveries
256 */
257 public RedeliveryPolicy backOffMultiplier(double multiplier) {
258 useExponentialBackOff();
259 setBackOffMultiplier(multiplier);
260 return this;
261 }
262
263 /**
264 * Enables collision avoidance and sets the percentage used
265 */
266 public RedeliveryPolicy collisionAvoidancePercent(double collisionAvoidancePercent) {
267 useCollisionAvoidance();
268 setCollisionAvoidancePercent(collisionAvoidancePercent);
269 return this;
270 }
271
272 /**
273 * Sets the maximum redelivery delay if using exponential back off.
274 * Use -1 if you wish to have no maximum
275 */
276 public RedeliveryPolicy maximumRedeliveryDelay(long maximumRedeliveryDelay) {
277 setMaximumRedeliveryDelay(maximumRedeliveryDelay);
278 return this;
279 }
280
281 /**
282 * Sets the logging level to use for log messages when retries have been exhausted.
283 */
284 public RedeliveryPolicy retriesExhaustedLogLevel(LoggingLevel retriesExhaustedLogLevel) {
285 setRetriesExhaustedLogLevel(retriesExhaustedLogLevel);
286 return this;
287 }
288
289 /**
290 * Sets the logging level to use for log messages when retries are attempted.
291 */
292 public RedeliveryPolicy retryAttemptedLogLevel(LoggingLevel retryAttemptedLogLevel) {
293 setRetryAttemptedLogLevel(retryAttemptedLogLevel);
294 return this;
295 }
296
297 /**
298 * Sets the logging level to use for log messages when retries are attempted.
299 */
300 public RedeliveryPolicy logStackTrace(boolean logStackTrace) {
301 setLogStackTrace(logStackTrace);
302 return this;
303 }
304
305 /**
306 * Sets the delay pattern with delay intervals.
307 */
308 public RedeliveryPolicy delayPattern(String delayPattern) {
309 setDelayPattern(delayPattern);
310 return this;
311 }
312
313 /**
314 * Disables redelivery by setting maximum redeliveries to 0.
315 */
316 public RedeliveryPolicy disableRedelivery() {
317 setMaximumRedeliveries(0);
318 return this;
319 }
320
321 // Properties
322 // -------------------------------------------------------------------------
323
324 public long getRedeliverDelay() {
325 return redeliverDelay;
326 }
327
328 /**
329 * Sets the delay in milliseconds
330 */
331 public void setRedeliverDelay(long redeliverDelay) {
332 this.redeliverDelay = redeliverDelay;
333 }
334
335 public double getBackOffMultiplier() {
336 return backOffMultiplier;
337 }
338
339 /**
340 * Sets the multiplier used to increase the delay between redeliveries if
341 * {@link #setUseExponentialBackOff(boolean)} is enabled
342 */
343 public void setBackOffMultiplier(double backOffMultiplier) {
344 this.backOffMultiplier = backOffMultiplier;
345 }
346
347 public short getCollisionAvoidancePercent() {
348 return (short)Math.round(collisionAvoidanceFactor * 100);
349 }
350
351 /**
352 * Sets the percentage used for collision avoidance if enabled via
353 * {@link #setUseCollisionAvoidance(boolean)}
354 */
355 public void setCollisionAvoidancePercent(double collisionAvoidancePercent) {
356 this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
357 }
358
359 public double getCollisionAvoidanceFactor() {
360 return collisionAvoidanceFactor;
361 }
362
363 /**
364 * Sets the factor used for collision avoidance if enabled via
365 * {@link #setUseCollisionAvoidance(boolean)}
366 */
367 public void setCollisionAvoidanceFactor(double collisionAvoidanceFactor) {
368 this.collisionAvoidanceFactor = collisionAvoidanceFactor;
369 }
370
371 public int getMaximumRedeliveries() {
372 return maximumRedeliveries;
373 }
374
375 /**
376 * Sets the maximum number of times a message exchange will be redelivered.
377 * Setting a negative value will retry forever.
378 */
379 public void setMaximumRedeliveries(int maximumRedeliveries) {
380 this.maximumRedeliveries = maximumRedeliveries;
381 }
382
383 public long getMaximumRedeliveryDelay() {
384 return maximumRedeliveryDelay;
385 }
386
387 /**
388 * Sets the maximum redelivery delay if using exponential back off.
389 * Use -1 if you wish to have no maximum
390 */
391 public void setMaximumRedeliveryDelay(long maximumRedeliveryDelay) {
392 this.maximumRedeliveryDelay = maximumRedeliveryDelay;
393 }
394
395 public boolean isUseCollisionAvoidance() {
396 return useCollisionAvoidance;
397 }
398
399 /**
400 * Enables/disables collision avoidance which adds some randomization to the
401 * backoff timings to reduce contention probability
402 */
403 public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
404 this.useCollisionAvoidance = useCollisionAvoidance;
405 }
406
407 public boolean isUseExponentialBackOff() {
408 return useExponentialBackOff;
409 }
410
411 /**
412 * Enables/disables exponential backoff using the
413 * {@link #getBackOffMultiplier()} to increase the time between retries
414 */
415 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
416 this.useExponentialBackOff = useExponentialBackOff;
417 }
418
419 protected static synchronized Random getRandomNumberGenerator() {
420 if (randomNumberGenerator == null) {
421 randomNumberGenerator = new Random();
422 }
423 return randomNumberGenerator;
424 }
425
426 /**
427 * Sets the logging level to use for log messages when retries have been exhausted.
428 */
429 public void setRetriesExhaustedLogLevel(LoggingLevel retriesExhaustedLogLevel) {
430 this.retriesExhaustedLogLevel = retriesExhaustedLogLevel;
431 }
432
433 public LoggingLevel getRetriesExhaustedLogLevel() {
434 return retriesExhaustedLogLevel;
435 }
436
437 /**
438 * Sets the logging level to use for log messages when retries are attempted.
439 */
440 public void setRetryAttemptedLogLevel(LoggingLevel retryAttemptedLogLevel) {
441 this.retryAttemptedLogLevel = retryAttemptedLogLevel;
442 }
443
444 public LoggingLevel getRetryAttemptedLogLevel() {
445 return retryAttemptedLogLevel;
446 }
447
448 public String getDelayPattern() {
449 return delayPattern;
450 }
451
452 /**
453 * Sets an optional delay pattern to use insted of fixed delay.
454 */
455 public void setDelayPattern(String delayPattern) {
456 this.delayPattern = delayPattern;
457 }
458
459 public boolean isLogStackTrace() {
460 return logStackTrace;
461 }
462
463 /**
464 * Sets wheter stack traces should be logged or not
465 */
466 public void setLogStackTrace(boolean logStackTrace) {
467 this.logStackTrace = logStackTrace;
468 }
469 }