001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.util.ArrayList;
020 import java.util.Collection;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024
025 import org.apache.camel.AsyncProcessor;
026 import org.apache.camel.CamelContext;
027 import org.apache.camel.Endpoint;
028 import org.apache.camel.Exchange;
029 import org.apache.camel.Intercept;
030 import org.apache.camel.NoSuchEndpointException;
031 import org.apache.camel.Processor;
032 import org.apache.camel.Route;
033 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
034 import org.apache.camel.model.FromType;
035 import org.apache.camel.model.ProcessorType;
036 import org.apache.camel.model.RouteType;
037 import org.apache.camel.model.dataformat.DataFormatType;
038 import org.apache.camel.processor.Interceptor;
039 import org.apache.camel.processor.Pipeline;
040 import org.apache.camel.processor.ProceedProcessor;
041 import org.apache.camel.processor.UnitOfWorkProcessor;
042 import org.apache.camel.spi.DataFormat;
043 import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
044 import org.apache.camel.spi.InterceptStrategy;
045 import org.apache.camel.spi.RouteContext;
046
047 /**
048 * The context used to activate new routing rules
049 *
050 * @version $Revision: 701021 $
051 */
052 public class DefaultRouteContext implements RouteContext {
053 private RouteType route;
054 private FromType from;
055 private Collection<Route> routes;
056 private Endpoint<? extends Exchange> endpoint;
057 private List<Processor> eventDrivenProcessors = new ArrayList<Processor>();
058 private Interceptor lastInterceptor;
059 private CamelContext camelContext;
060 private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
061 private ErrorHandlerWrappingStrategy errorHandlerWrappingStrategy;
062 private boolean routeAdded;
063
064 public DefaultRouteContext(RouteType route, FromType from, Collection<Route> routes) {
065 this.route = route;
066 this.from = from;
067 this.routes = routes;
068 }
069
070 /**
071 * Only used for lazy construction from inside ExpressionType
072 */
073 public DefaultRouteContext(CamelContext camelContext) {
074 this.camelContext = camelContext;
075 routes = new ArrayList<Route>();
076 route = new RouteType("temporary");
077 }
078
079 public Endpoint<? extends Exchange> getEndpoint() {
080 if (endpoint == null) {
081 endpoint = from.resolveEndpoint(this);
082 }
083 return endpoint;
084 }
085
086 public FromType getFrom() {
087 return from;
088 }
089
090 public RouteType getRoute() {
091 return route;
092 }
093
094 public CamelContext getCamelContext() {
095 if (camelContext == null) {
096 camelContext = getRoute().getCamelContext();
097 }
098 return camelContext;
099 }
100
101 public Processor createProcessor(ProcessorType node) throws Exception {
102 return node.createOutputsProcessor(this);
103 }
104
105 public Endpoint<? extends Exchange> resolveEndpoint(String uri) {
106 return route.resolveEndpoint(uri);
107 }
108
109 public Endpoint<? extends Exchange> resolveEndpoint(String uri, String ref) {
110 Endpoint<? extends Exchange> endpoint = null;
111 if (uri != null) {
112 endpoint = resolveEndpoint(uri);
113 if (endpoint == null) {
114 throw new NoSuchEndpointException(uri);
115 }
116 }
117 if (ref != null) {
118 endpoint = lookup(ref, Endpoint.class);
119 if (endpoint == null) {
120 throw new NoSuchEndpointException("ref:" + ref);
121 }
122 }
123 if (endpoint == null) {
124 throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this);
125 } else {
126 return endpoint;
127 }
128 }
129
130 public <T> T lookup(String name, Class<T> type) {
131 return getCamelContext().getRegistry().lookup(name, type);
132 }
133
134 public void commit() {
135 // now lets turn all of the event driven consumer processors into a
136 // single route
137 if (!eventDrivenProcessors.isEmpty()) {
138 Processor processor = Pipeline.newInstance(eventDrivenProcessors);
139
140 // lets create the async processor
141 final AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
142 Processor unitOfWorkProcessor = new UnitOfWorkProcessor(asyncProcessor);
143
144 // TODO: hz: move all this into the lifecycle strategy! (used by jmx naming strategy)
145 Route edcr = new EventDrivenConsumerRoute(getEndpoint(), unitOfWorkProcessor);
146 edcr.getProperties().put(Route.ID_PROPERTY, route.idOrCreate());
147 edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode()));
148 if (route.getGroup() != null) {
149 edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup());
150 }
151 routes.add(edcr);
152 }
153 }
154
155 public void addEventDrivenProcessor(Processor processor) {
156 eventDrivenProcessors.add(processor);
157 }
158
159 public void intercept(Intercept interceptor) {
160 /*
161 InterceptorRef block = new InterceptorRef(interceptor);
162 RouteType route = getRoute();
163 List<ProcessorType<?>> list = route.getOutputs();
164 for (ProcessorType<?> processorType : list) {
165 block.addOutput(processorType);
166 }
167 route.clearOutput();
168 route.intercept(block);
169 */
170
171 //getRoute().getInterceptors().add(new InterceptorRef(interceptor));
172 lastInterceptor = (Interceptor)interceptor;
173 }
174
175 public Processor createProceedProcessor() {
176 if (lastInterceptor == null) {
177 throw new IllegalArgumentException("Cannot proceed() from outside of an interceptor!");
178 } else {
179 return new ProceedProcessor(lastInterceptor);
180 }
181 }
182
183 public List<InterceptStrategy> getInterceptStrategies() {
184 return interceptStrategies;
185 }
186
187 public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) {
188 this.interceptStrategies = interceptStrategies;
189 }
190
191 public void addInterceptStrategy(InterceptStrategy interceptStrategy) {
192 getInterceptStrategies().add(interceptStrategy);
193 }
194
195 public ErrorHandlerWrappingStrategy getErrorHandlerWrappingStrategy() {
196 return errorHandlerWrappingStrategy;
197 }
198
199 public void setErrorHandlerWrappingStrategy(ErrorHandlerWrappingStrategy strategy) {
200 errorHandlerWrappingStrategy = strategy;
201
202 }
203
204 public boolean isRouteAdded() {
205 return routeAdded;
206 }
207
208 public void setIsRouteAdded(boolean b) {
209 routeAdded = b;
210
211 }
212
213 public DataFormatType getDataFormat(String ref) {
214 Map<String, DataFormatType> dataFormats = getCamelContext().getDataFormats();
215 if (dataFormats != null) {
216 return dataFormats.get(ref);
217 } else {
218 return null;
219 }
220 }
221 }