001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.lang.reflect.Method;
020
021 import javax.xml.bind.annotation.XmlTransient;
022
023 import org.apache.camel.CamelContext;
024 import org.apache.camel.CamelContextAware;
025 import org.apache.camel.Consume;
026 import org.apache.camel.Consumer;
027 import org.apache.camel.Endpoint;
028 import org.apache.camel.PollingConsumer;
029 import org.apache.camel.Processor;
030 import org.apache.camel.Producer;
031 import org.apache.camel.ProducerTemplate;
032 import org.apache.camel.Service;
033 import org.apache.camel.component.bean.BeanProcessor;
034 import org.apache.camel.component.bean.ProxyHelper;
035 import org.apache.camel.util.CamelContextHelper;
036 import org.apache.camel.util.ObjectHelper;
037 import org.apache.commons.logging.Log;
038 import org.apache.commons.logging.LogFactory;
039
040 /**
041 * A helper class for Camel based injector or post processing hooks which can be reused by
042 * both the <a href="http://camel.apache.org/spring.html">Spring</a>
043 * and <a href="http://camel.apache.org/guice.html">Guice</a> support.
044 *
045 * @version $Revision: 788297 $
046 */
047 public class CamelPostProcessorHelper implements CamelContextAware {
048 private static final transient Log LOG = LogFactory.getLog(CamelPostProcessorHelper.class);
049
050 @XmlTransient
051 private CamelContext camelContext;
052
053 public CamelPostProcessorHelper() {
054 }
055
056 public CamelPostProcessorHelper(CamelContext camelContext) {
057 this.setCamelContext(camelContext);
058 }
059
060 public CamelContext getCamelContext() {
061 return camelContext;
062 }
063
064 public void setCamelContext(CamelContext camelContext) {
065 this.camelContext = camelContext;
066 }
067
068 /**
069 * Does the given context match this camel context
070 */
071 public boolean matchContext(String context) {
072 if (ObjectHelper.isNotEmpty(context)) {
073 if (!camelContext.getName().equals(context)) {
074 return false;
075 }
076 }
077 return true;
078 }
079
080 public void consumerInjection(Method method, Object bean) {
081 Consume consume = method.getAnnotation(Consume.class);
082 if (consume != null && matchContext(consume.context())) {
083 LOG.info("Creating a consumer for: " + consume);
084 subscribeMethod(method, bean, consume.uri(), consume.ref());
085 }
086 }
087
088 public void subscribeMethod(Method method, Object bean, String endpointUri, String endpointName) {
089 // lets bind this method to a listener
090 String injectionPointName = method.getName();
091 Endpoint endpoint = getEndpointInjection(endpointUri, endpointName, injectionPointName, true);
092 if (endpoint != null) {
093 try {
094 Processor processor = createConsumerProcessor(bean, method, endpoint);
095 Consumer consumer = endpoint.createConsumer(processor);
096 if (LOG.isDebugEnabled()) {
097 LOG.debug("Created processor: " + processor + " for consumer: " + consumer);
098 }
099 startService(consumer);
100 } catch (Exception e) {
101 throw ObjectHelper.wrapRuntimeCamelException(e);
102 }
103 }
104 }
105
106 public void startService(Service service) throws Exception {
107 CamelContext camelContext = getCamelContext();
108 if (camelContext instanceof DefaultCamelContext) {
109 DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
110 defaultCamelContext.addService(service);
111 } else {
112 service.start();
113 }
114 }
115
116 /**
117 * Create a processor which invokes the given method when an incoming
118 * message exchange is received
119 */
120 protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
121 BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
122 answer.setMethodObject(method);
123 return answer;
124 }
125
126 protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {
127 return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory);
128 }
129
130 /**
131 * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point
132 */
133 public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String injectionPointName) {
134 if (type.isAssignableFrom(ProducerTemplate.class)) {
135 return createInjectionProducerTemplate(endpointUri, endpointRef, injectionPointName);
136 } else {
137 Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, true);
138 if (endpoint != null) {
139 if (type.isInstance(endpoint)) {
140 return endpoint;
141 } else if (type.isAssignableFrom(Producer.class)) {
142 return createInjectionProducer(endpoint);
143 } else if (type.isAssignableFrom(PollingConsumer.class)) {
144 return createInjectionPollingConsumer(endpoint);
145 } else if (type.isInterface()) {
146 // lets create a proxy
147 try {
148 return ProxyHelper.createProxy(endpoint, type);
149 } catch (Exception e) {
150 throw createProxyInstantiationRuntimeException(type, endpoint, e);
151 }
152 } else {
153 throw new IllegalArgumentException("Invalid type: " + type.getName()
154 + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint);
155 }
156 }
157 return null;
158 }
159 }
160
161 /**
162 * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO
163 */
164 protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
165 // endpoint is optional for this injection point
166 Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false);
167 return new DefaultProducerTemplate(getCamelContext(), endpoint);
168 }
169
170 /**
171 * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO
172 */
173 protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint) {
174 try {
175 PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
176 startService(pollingConsumer);
177 return pollingConsumer;
178 } catch (Exception e) {
179 throw ObjectHelper.wrapRuntimeCamelException(e);
180 }
181 }
182
183 /**
184 * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO
185 */
186 protected Producer createInjectionProducer(Endpoint endpoint) {
187 try {
188 Producer producer = endpoint.createProducer();
189 startService(producer);
190 return producer;
191 } catch (Exception e) {
192 throw ObjectHelper.wrapRuntimeCamelException(e);
193 }
194 }
195
196 protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) {
197 return new ProxyInstantiationException(type, endpoint, e);
198 }
199 }