001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.ArrayList;
020 import java.util.List;
021
022 import org.apache.camel.Channel;
023 import org.apache.camel.Exchange;
024 import org.apache.camel.Processor;
025 import org.apache.camel.impl.ServiceSupport;
026 import org.apache.camel.model.ProcessorDefinition;
027 import org.apache.camel.spi.InterceptStrategy;
028 import org.apache.camel.spi.RouteContext;
029 import org.apache.camel.util.ServiceHelper;
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032
033 /**
034 * DefaultChannel is the default {@link Channel}.
035 * <p/>
036 * The current implementation is just a composite containing the interceptors and error handler
037 * that beforehand was added to the route graph directly.
038 * <br/>
039 * With this {@link Channel} we can in the future implement better strategies for routing the
040 * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node
041 * in the graph.
042 *
043 * @version $Revision: 784101 $
044 */
045 public class DefaultChannel extends ServiceSupport implements Processor, Channel {
046
047 private static final transient Log LOG = LogFactory.getLog(DefaultChannel.class);
048
049 private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>();
050 private Processor errorHandler;
051 // the next processor (non wrapped)
052 private Processor nextProcessor;
053 // the real output to invoke that has been wrapped
054 private Processor output;
055 private ProcessorDefinition definition;
056
057 public List<Processor> next() {
058 List<Processor> answer = new ArrayList<Processor>(1);
059 answer.add(nextProcessor);
060 return answer;
061 }
062
063 public boolean hasNext() {
064 return nextProcessor != null;
065 }
066
067 public void setNextProcessor(Processor next) {
068 this.nextProcessor = next;
069 }
070
071 public Processor getOutput() {
072 // the errorHandler is already decorated with interceptors
073 // so it cointain the entire chain of processors, so we can safely use it directly as output
074 // if no error handler provided we can use the output direcly
075 return errorHandler != null ? errorHandler : output;
076 }
077
078 public void setOutput(Processor output) {
079 this.output = output;
080 }
081
082 public Processor getNextProcessor() {
083 return nextProcessor;
084 }
085
086 public boolean hasInterceptorStrategy(Class type) {
087 for (InterceptStrategy strategy : interceptors) {
088 if (type.isInstance(strategy)) {
089 return true;
090 }
091 }
092 return false;
093 }
094
095 public void setErrorHandler(Processor errorHandler) {
096 this.errorHandler = errorHandler;
097 }
098
099 public Processor getErrorHandler() {
100 return errorHandler;
101 }
102
103 public void addInterceptStrategy(InterceptStrategy strategy) {
104 interceptors.add(strategy);
105 }
106
107 public void addInterceptStrategies(List<InterceptStrategy> strategies) {
108 interceptors.addAll(strategies);
109 }
110
111 public List<InterceptStrategy> getInterceptStrategies() {
112 return interceptors;
113 }
114
115 public ProcessorDefinition getProcessorDefinition() {
116 return definition;
117 }
118
119 @Override
120 protected void doStart() throws Exception {
121 ServiceHelper.startServices(errorHandler, output);
122 }
123
124 @Override
125 protected void doStop() throws Exception {
126 ServiceHelper.stopServices(output, errorHandler);
127 }
128
129 public void initChannel(ProcessorDefinition outputDefinition, RouteContext routeContext) throws Exception {
130 this.definition = outputDefinition;
131
132 // TODO: Support ordering of interceptors
133
134 // wrap the output with the interceptors
135 Processor target = nextProcessor;
136 for (InterceptStrategy strategy : interceptors) {
137 Processor next = target == nextProcessor ? null : nextProcessor;
138 target = strategy.wrapProcessorInInterceptors(outputDefinition, target, next);
139 }
140
141 // sets the delegate to our wrapped output
142 output = target;
143 }
144
145 public void process(Exchange exchange) throws Exception {
146 Processor processor = getOutput();
147 if (processor != null && continueProcessing(exchange)) {
148 processor.process(exchange);
149 }
150 }
151
152 /**
153 * Strategy to determine if we should continue processing the {@link Exchange}.
154 */
155 protected boolean continueProcessing(Exchange exchange) {
156 Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
157 if (stop != null) {
158 boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
159 if (doStop) {
160 if (LOG.isDebugEnabled()) {
161 LOG.debug("Exchange is marked to stop routing: " + exchange);
162 }
163 return false;
164 }
165 }
166 return true;
167 }
168
169 @Override
170 public String toString() {
171 // just output the next processor as all the interceptors and error handler is just too verbose
172 return "Channel[" + nextProcessor + "]";
173 }
174
175 }