001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor.loadbalancer;
018
019 import java.util.List;
020
021 import org.apache.camel.Exchange;
022 import org.apache.camel.Processor;
023 import org.apache.camel.util.ObjectHelper;
024
025 /**
026 * This FailOverLoadBalancer will failover to use next processor when an exception occured
027 */
028 public class FailOverLoadBalancer extends LoadBalancerSupport {
029
030 private final List<Class> exceptions;
031
032 public FailOverLoadBalancer() {
033 this.exceptions = null;
034 }
035
036 public FailOverLoadBalancer(List<Class> exceptions) {
037 this.exceptions = exceptions;
038 for (Class type : exceptions) {
039 if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) {
040 throw new IllegalArgumentException("Class is not an instance of Trowable: " + type);
041 }
042 }
043 }
044
045 /**
046 * Should the given failed Exchange failover?
047 *
048 * @param exchange the exchange that failed
049 * @return <tt>true</tt> to failover
050 */
051 protected boolean shouldFailOver(Exchange exchange) {
052 if (exchange.getException() != null) {
053
054 if (exceptions == null || exceptions.isEmpty()) {
055 // always failover if no exceptions defined
056 return true;
057 }
058
059 for (Class exception : exceptions) {
060 // will look in exception hierarchy
061 if (exchange.getException(exception) != null) {
062 return true;
063 }
064 }
065 }
066
067 return false;
068 }
069
070 public void process(Exchange exchange) throws Exception {
071 List<Processor> list = getProcessors();
072 if (list.isEmpty()) {
073 throw new IllegalStateException("No processors available to process " + exchange);
074 }
075
076 int index = 0;
077 Processor processor = list.get(index);
078
079 // process the first time
080 processExchange(processor, exchange);
081
082 // loop while we should fail over
083 while (shouldFailOver(exchange)) {
084 index++;
085 if (index < list.size()) {
086 // try again but prepare exchange before we failover
087 prepareExchangeForFailover(exchange);
088 processor = list.get(index);
089 processExchange(processor, exchange);
090 } else {
091 // no more processors to try
092 break;
093 }
094 }
095 }
096
097 /**
098 * Prepares the exchange for failover
099 *
100 * @param exchange the exchange
101 */
102 protected void prepareExchangeForFailover(Exchange exchange) {
103 exchange.setException(null);
104
105 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null);
106 exchange.setProperty(Exchange.FAILURE_HANDLED, null);
107 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, null);
108 exchange.getIn().removeHeader(Exchange.REDELIVERED);
109 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
110 }
111
112 private void processExchange(Processor processor, Exchange exchange) {
113 if (processor == null) {
114 throw new IllegalStateException("No processors could be chosen to process " + exchange);
115 }
116 try {
117 processor.process(exchange);
118 } catch (Exception e) {
119 exchange.setException(e);
120 }
121 }
122
123 public String toString() {
124 return "FailoverLoadBalancer";
125 }
126
127 }