001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.spring;
018
019 import java.lang.reflect.Field;
020 import java.lang.reflect.Method;
021
022 import javax.xml.bind.annotation.XmlAccessType;
023 import javax.xml.bind.annotation.XmlAccessorType;
024 import javax.xml.bind.annotation.XmlRootElement;
025 import javax.xml.bind.annotation.XmlTransient;
026
027 import org.apache.camel.CamelContextAware;
028 import org.apache.camel.Consumer;
029 import org.apache.camel.Endpoint;
030 import org.apache.camel.EndpointInject;
031 import org.apache.camel.MessageDriven;
032 import org.apache.camel.PollingConsumer;
033 import org.apache.camel.Processor;
034 import org.apache.camel.Producer;
035 import org.apache.camel.RuntimeCamelException;
036 import org.apache.camel.Service;
037 import org.apache.camel.component.bean.BeanProcessor;
038 import org.apache.camel.impl.DefaultProducerTemplate;
039 import org.apache.camel.spring.util.ReflectionUtils;
040 import org.apache.camel.util.ObjectHelper;
041 import org.apache.commons.logging.Log;
042 import org.apache.commons.logging.LogFactory;
043 import org.springframework.beans.BeansException;
044 import org.springframework.beans.factory.NoSuchBeanDefinitionException;
045 import org.springframework.beans.factory.config.BeanPostProcessor;
046 import org.springframework.context.ApplicationContext;
047 import org.springframework.context.ApplicationContextAware;
048
049 import static org.apache.camel.util.ObjectHelper.isNotNullAndNonEmpty;
050 import static org.apache.camel.util.ObjectHelper.isNullOrBlank;
051
052 /**
053 * A post processor to perform injection of {@link Endpoint} and
054 * {@link Producer} instances together with binding methods annotated with
055 * {@link MessageDriven @MessageDriven} to a Camel consumer.
056 *
057 * @version $Revision: 664627 $
058 */
059 @XmlRootElement(name = "beanPostProcessor")
060 @XmlAccessorType(XmlAccessType.FIELD)
061 public class CamelBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {
062 private static final transient Log LOG = LogFactory.getLog(CamelBeanPostProcessor.class);
063 @XmlTransient
064 private SpringCamelContext camelContext;
065 @XmlTransient
066 private ApplicationContext applicationContext;
067
068 public CamelBeanPostProcessor() {
069 }
070
071 public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
072 injectFields(bean);
073 injectMethods(bean);
074 if (bean instanceof CamelContextAware) {
075 CamelContextAware contextAware = (CamelContextAware)bean;
076 if (camelContext == null) {
077 LOG.warn("No CamelContext defined yet so cannot inject into: " + bean);
078 } else {
079 contextAware.setCamelContext(camelContext);
080 }
081 }
082 return bean;
083 }
084
085 public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
086 return bean;
087 }
088
089 // Properties
090 // -------------------------------------------------------------------------
091
092 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
093 this.applicationContext = applicationContext;
094 }
095
096 public SpringCamelContext getCamelContext() {
097 return camelContext;
098 }
099
100 public void setCamelContext(SpringCamelContext camelContext) {
101 this.camelContext = camelContext;
102 }
103
104 // Implementation methods
105 // -------------------------------------------------------------------------
106
107 /**
108 * A strategy method to allow implementations to perform some custom JBI
109 * based injection of the POJO
110 *
111 * @param bean the bean to be injected
112 */
113 protected void injectFields(final Object bean) {
114 ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
115 public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
116 EndpointInject annotation = field.getAnnotation(EndpointInject.class);
117 if (annotation != null) {
118 ReflectionUtils.setField(field, bean, getEndpointInjectionValue(annotation, field.getType(), field.getName()));
119 }
120 }
121 });
122 }
123
124 protected void injectMethods(final Object bean) {
125 ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() {
126 @SuppressWarnings("unchecked")
127 public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
128 setterInjection(method, bean);
129 consumerInjection(method, bean);
130 }
131 });
132 }
133
134 protected void setterInjection(Method method, Object bean) {
135 EndpointInject annoation = method.getAnnotation(EndpointInject.class);
136 if (annoation != null) {
137 Class<?>[] parameterTypes = method.getParameterTypes();
138 if (parameterTypes != null) {
139 if (parameterTypes.length != 1) {
140 LOG.warn("Ignoring badly annotated method for injection due to incorrect number of parameters: " + method);
141 } else {
142 String propertyName = ObjectHelper.getPropertyName(method);
143 Object value = getEndpointInjectionValue(annoation, parameterTypes[0], propertyName);
144 ObjectHelper.invokeMethod(method, bean, value);
145 }
146 }
147 }
148 }
149
150 protected void consumerInjection(final Object bean) {
151 ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() {
152 @SuppressWarnings("unchecked")
153 public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
154 /*
155 * TODO support callbacks? if
156 * (method.getAnnotation(Callback.class) != null) { try {
157 * Expression e = ExpressionFactory.createExpression(
158 * method.getAnnotation(Callback.class).condition());
159 * JexlContext jc = JexlHelper.createContext();
160 * jc.getVars().put("this", obj); Object r = e.evaluate(jc); if
161 * (!(r instanceof Boolean)) { throw new
162 * RuntimeException("Expression did not returned a boolean value
163 * but: " + r); } Boolean oldVal =
164 * req.getCallbacks().get(method); Boolean newVal = (Boolean) r;
165 * if ((oldVal == null || !oldVal) && newVal) {
166 * req.getCallbacks().put(method, newVal); method.invoke(obj,
167 * new Object[0]); // TODO: handle return value and sent it as
168 * the answer } } catch (Exception e) { throw new
169 * RuntimeException("Unable to invoke callback", e); } }
170 */
171 }
172 });
173 }
174
175 protected void consumerInjection(Method method, Object bean) {
176 MessageDriven annotation = method.getAnnotation(MessageDriven.class);
177 if (annotation != null) {
178 LOG.info("Creating a consumer for: " + annotation);
179
180 // lets bind this method to a listener
181 String injectionPointName = method.getName();
182 Endpoint endpoint = getEndpointInjection(annotation.uri(), annotation.name(), injectionPointName);
183 if (endpoint != null) {
184 try {
185 Processor processor = createConsumerProcessor(bean, method, endpoint);
186 LOG.info("Created processor: " + processor);
187 Consumer consumer = endpoint.createConsumer(processor);
188 startService(consumer);
189 } catch (Exception e) {
190 LOG.warn(e);
191 throw new RuntimeCamelException(e);
192 }
193 }
194 }
195 }
196
197 protected void startService(Service service) throws Exception {
198 camelContext.addService(service);
199 }
200
201 /**
202 * Create a processor which invokes the given method when an incoming
203 * message exchange is received
204 */
205 protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
206 BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
207 answer.setMethodObject(method);
208 return answer;
209 }
210
211
212 /**
213 * Creates the value for the injection point for the given annotation
214 */
215 protected Object getEndpointInjectionValue(EndpointInject annotation, Class<?> type, String injectionPointName) {
216 Endpoint endpoint = getEndpointInjection(annotation.uri(), annotation.name(), injectionPointName);
217 if (endpoint != null) {
218 if (type.isInstance(endpoint)) {
219 return endpoint;
220 } else if (type.isAssignableFrom(Producer.class)) {
221 return createInjectionProducer(endpoint);
222 } else if (type.isAssignableFrom(DefaultProducerTemplate.class)) {
223 return new DefaultProducerTemplate(getCamelContext(), endpoint);
224 } else if (type.isAssignableFrom(PollingConsumer.class)) {
225 return createInjectionPollingConsumer(endpoint);
226 } else {
227 throw new IllegalArgumentException("Invalid type: " + type.getName() + " which cannot be injected via @EndpointInject for " + endpoint);
228 }
229 }
230 return null;
231 }
232
233 /**
234 * Factory method to create a started {@link PollingConsumer} to be injected
235 * into a POJO
236 */
237 protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint) {
238 try {
239 PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
240 startService(pollingConsumer);
241 return pollingConsumer;
242 } catch (Exception e) {
243 throw new RuntimeCamelException(e);
244 }
245 }
246
247 /**
248 * A Factory method to create a started {@link Producer} to be injected into
249 * a POJO
250 */
251 protected Producer createInjectionProducer(Endpoint endpoint) {
252 try {
253 Producer producer = endpoint.createProducer();
254 startService(producer);
255 return producer;
256 } catch (Exception e) {
257 throw new RuntimeCamelException(e);
258 }
259 }
260
261 protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName) {
262 Endpoint endpoint = null;
263 if (isNotNullAndNonEmpty(uri)) {
264 endpoint = camelContext.getEndpoint(uri);
265 } else {
266 if (isNullOrBlank(name)) {
267 name = injectionPointName;
268 }
269 endpoint = (Endpoint) applicationContext.getBean(name);
270 if (endpoint == null) {
271 throw new NoSuchBeanDefinitionException(name);
272 }
273 }
274 return endpoint;
275 }
276
277 }