001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.util.Map;
020
021 import org.apache.camel.Endpoint;
022 import org.apache.camel.Exchange;
023 import org.apache.camel.ExchangePattern;
024 import org.apache.camel.FailedToCreateProducerException;
025 import org.apache.camel.Processor;
026 import org.apache.camel.Producer;
027 import org.apache.camel.ProducerCallback;
028 import org.apache.camel.ServicePoolAware;
029 import org.apache.camel.spi.ServicePool;
030 import org.apache.camel.util.LRUCache;
031 import org.apache.camel.util.ServiceHelper;
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
035
036 /**
037 * Cache containing created {@link Producer}.
038 *
039 * @version $Revision: 792408 $
040 */
041 public class ProducerCache extends ServiceSupport {
042 private static final transient Log LOG = LogFactory.getLog(ProducerCache.class);
043
044 private final Map<String, Producer> producers;
045 private final ServicePool<Endpoint, Producer> pool;
046
047 // TODO: Have easy configuration of pooling in Camel
048
049 public ProducerCache(ServicePool<Endpoint, Producer> producerServicePool) {
050 this.pool = producerServicePool;
051 this.producers = new LRUCache<String, Producer>(1000);
052 }
053
054 public ProducerCache(ServicePool<Endpoint, Producer> producerServicePool, Map<String, Producer> cache) {
055 this.pool = producerServicePool;
056 this.producers = cache;
057 }
058
059 public Producer getProducer(Endpoint endpoint) {
060 // As the producer is returned outside this method we do not want to return pooled producers
061 // so we pass in false to the method. if we returned pooled producers then the user had
062 // to remember to return it back in the pool.
063 // See method doInProducer that is safe template pattern where we handle the lifecycle and
064 // thus safely can use pooled producers there
065 return doGetProducer(endpoint, false);
066 }
067
068 /**
069 * Sends the exchange to the given endpoint
070 *
071 * @param endpoint the endpoint to send the exchange to
072 * @param exchange the exchange to send
073 */
074 public void send(Endpoint endpoint, Exchange exchange) {
075 try {
076 sendExchange(endpoint, null, null, exchange);
077 } catch (Exception e) {
078 throw wrapRuntimeCamelException(e);
079 }
080 }
081
082 /**
083 * Sends an exchange to an endpoint using a supplied
084 * {@link Processor} to populate the exchange
085 *
086 * @param endpoint the endpoint to send the exchange to
087 * @param processor the transformer used to populate the new exchange
088 * @return the exchange
089 */
090 public Exchange send(Endpoint endpoint, Processor processor) {
091 try {
092 return sendExchange(endpoint, null, processor, null);
093 } catch (Exception e) {
094 throw wrapRuntimeCamelException(e);
095 }
096 }
097
098 /**
099 * Sends an exchange to an endpoint using a supplied
100 * {@link Processor} to populate the exchange
101 *
102 * @param endpoint the endpoint to send the exchange to
103 * @param pattern the message {@link ExchangePattern} such as
104 * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
105 * @param processor the transformer used to populate the new exchange
106 * @return the exchange
107 */
108 public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
109 try {
110 return sendExchange(endpoint, pattern, processor, null);
111 } catch (Exception e) {
112 throw wrapRuntimeCamelException(e);
113 }
114 }
115
116
117 /**
118 * Sends an exchange to an endpoint using a supplied callback
119 *
120 * @param endpoint the endpoint to send the exchange to
121 * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
122 * @param pattern the exchange pattern, can be <tt>null</tt>
123 * @param callback the callback
124 * @return the response from the callback
125 * @throws Exception if an internal processing error has occurred.
126 */
127 public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) throws Exception {
128 // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
129 Producer producer = doGetProducer(endpoint, true);
130
131 if (producer == null) {
132 if (isStopped()) {
133 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
134 return null;
135 } else {
136 throw new IllegalStateException("No producer, this processor has not been started: " + this);
137 }
138 }
139
140 try {
141 // invoke the callback
142 return callback.doInProducer(producer, exchange, pattern);
143 } finally {
144 if (producer instanceof ServicePoolAware) {
145 // release back to the pool
146 pool.release(endpoint, producer);
147 } else if (!producer.isSingleton()) {
148 // stop non singleton producers as we should not leak resources
149 producer.stop();
150 }
151 }
152 }
153
154 protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
155 final Processor processor, Exchange exchange) throws Exception {
156 return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
157 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
158 if (exchange == null) {
159 exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange();
160 }
161
162 if (processor != null) {
163 // lets populate using the processor callback
164 processor.process(exchange);
165 }
166
167 // now lets dispatch
168 if (LOG.isDebugEnabled()) {
169 LOG.debug(">>>> " + endpoint + " " + exchange);
170 }
171 producer.process(exchange);
172 return exchange;
173 }
174 });
175 }
176
177 protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
178 String key = endpoint.getEndpointUri();
179 Producer answer = producers.get(key);
180 if (pooled && answer == null) {
181 // try acquire from connection pool
182 answer = pool.acquire(endpoint);
183 }
184
185 if (answer == null) {
186 // create a new producer
187 try {
188 answer = endpoint.createProducer();
189 answer.start();
190 } catch (Exception e) {
191 throw new FailedToCreateProducerException(endpoint, e);
192 }
193
194 // add producer to cache or pool if applicable
195 if (pooled && answer instanceof ServicePoolAware) {
196 if (LOG.isDebugEnabled()) {
197 LOG.debug("Adding to producer service pool with key: " + endpoint + " for producer: " + answer);
198 }
199 answer = pool.addAndAcquire(endpoint, answer);
200 } else if (answer.isSingleton()) {
201 if (LOG.isDebugEnabled()) {
202 LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer);
203 }
204 producers.put(key, answer);
205 }
206 }
207
208 return answer;
209 }
210
211 protected void doStop() throws Exception {
212 // the producers will be stopped from where they are acquired
213 producers.clear();
214 ServiceHelper.stopServices(pool);
215 }
216
217 protected void doStart() throws Exception {
218 ServiceHelper.startServices(pool);
219 }
220
221 /**
222 * Returns the current size of the producer cache
223 *
224 * @return the current size
225 */
226 int size() {
227 return producers.size();
228 }
229
230 }