001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.ArrayList;
020 import java.util.Collection;
021 import java.util.Iterator;
022 import java.util.LinkedList;
023 import java.util.List;
024 import java.util.Queue;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.locks.Condition;
027 import java.util.concurrent.locks.Lock;
028 import java.util.concurrent.locks.ReentrantLock;
029
030 import org.apache.camel.Exchange;
031 import org.apache.camel.Navigate;
032 import org.apache.camel.Processor;
033 import org.apache.camel.impl.DefaultExchange;
034 import org.apache.camel.impl.LoggingExceptionHandler;
035 import org.apache.camel.impl.ServiceSupport;
036 import org.apache.camel.spi.ExceptionHandler;
037 import org.apache.camel.util.ObjectHelper;
038 import org.apache.camel.util.ServiceHelper;
039 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042
043 /**
044 * A base class for any kind of {@link Processor} which implements some kind of batch processing.
045 *
046 * @version $Revision: 791088 $
047 */
048 public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
049
050 public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
051 public static final int DEFAULT_BATCH_SIZE = 100;
052
053 private static final Log LOG = LogFactory.getLog(BatchProcessor.class);
054
055 private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
056 private int batchSize = DEFAULT_BATCH_SIZE;
057 private int outBatchSize;
058 private boolean groupExchanges;
059 private boolean batchConsumer;
060
061 private final Processor processor;
062 private final Collection<Exchange> collection;
063 private ExceptionHandler exceptionHandler;
064
065 private final BatchSender sender;
066
067 public BatchProcessor(Processor processor, Collection<Exchange> collection) {
068 ObjectHelper.notNull(processor, "processor");
069 ObjectHelper.notNull(collection, "collection");
070 this.processor = processor;
071 this.collection = collection;
072 this.sender = new BatchSender();
073 }
074
075 @Override
076 public String toString() {
077 return "BatchProcessor[to: " + processor + "]";
078 }
079
080 // Properties
081 // -------------------------------------------------------------------------
082 public ExceptionHandler getExceptionHandler() {
083 if (exceptionHandler == null) {
084 exceptionHandler = new LoggingExceptionHandler(getClass());
085 }
086 return exceptionHandler;
087 }
088
089 public void setExceptionHandler(ExceptionHandler exceptionHandler) {
090 this.exceptionHandler = exceptionHandler;
091 }
092
093 public int getBatchSize() {
094 return batchSize;
095 }
096
097 /**
098 * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will
099 * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
100 *
101 * @param batchSize the size
102 */
103 public void setBatchSize(int batchSize) {
104 // setting batch size to 0 or negative is like disabling it, so we set it as the max value
105 // as the code logic is dependt on a batch size having 1..n value
106 if (batchSize <= 0) {
107 LOG.debug("Disabling batch size, will only be triggered by timeout");
108 this.batchSize = Integer.MAX_VALUE;
109 } else {
110 this.batchSize = batchSize;
111 }
112 }
113
114 public int getOutBatchSize() {
115 return outBatchSize;
116 }
117
118 /**
119 * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the
120 * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain
121 * number of exchanges has been collected. By default this feature is <b>not</b> enabled.
122 *
123 * @param outBatchSize the size
124 */
125 public void setOutBatchSize(int outBatchSize) {
126 this.outBatchSize = outBatchSize;
127 }
128
129 public long getBatchTimeout() {
130 return batchTimeout;
131 }
132
133 public void setBatchTimeout(long batchTimeout) {
134 this.batchTimeout = batchTimeout;
135 }
136
137 public boolean isGroupExchanges() {
138 return groupExchanges;
139 }
140
141 public void setGroupExchanges(boolean groupExchanges) {
142 this.groupExchanges = groupExchanges;
143 }
144
145 public boolean isBatchConsumer() {
146 return batchConsumer;
147 }
148
149 public void setBatchConsumer(boolean batchConsumer) {
150 this.batchConsumer = batchConsumer;
151 }
152
153 public Processor getProcessor() {
154 return processor;
155 }
156
157 public List<Processor> next() {
158 if (!hasNext()) {
159 return null;
160 }
161 List<Processor> answer = new ArrayList<Processor>(1);
162 answer.add(processor);
163 return answer;
164 }
165
166 public boolean hasNext() {
167 return processor != null;
168 }
169
170 /**
171 * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in
172 * the in queue should be drained to the "out" collection.
173 */
174 private boolean isInBatchCompleted(int num) {
175 return num >= batchSize;
176 }
177
178 /**
179 * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in
180 * the out collection should be sent.
181 */
182 private boolean isOutBatchCompleted() {
183 if (outBatchSize == 0) {
184 // out batch is disabled, so go ahead and send.
185 return true;
186 }
187 return collection.size() > 0 && collection.size() >= outBatchSize;
188 }
189
190 /**
191 * Strategy Method to process an exchange in the batch. This method allows derived classes to perform
192 * custom processing before or after an individual exchange is processed
193 */
194 protected void processExchange(Exchange exchange) throws Exception {
195 processor.process(exchange);
196 }
197
198 protected void doStart() throws Exception {
199 ServiceHelper.startServices(processor);
200 sender.start();
201 }
202
203 protected void doStop() throws Exception {
204 sender.cancel();
205 ServiceHelper.stopServices(sender);
206 ServiceHelper.stopServices(processor);
207 collection.clear();
208 }
209
210 /**
211 * Enqueues an exchange for later batch processing.
212 */
213 public void process(Exchange exchange) throws Exception {
214
215 // if batch consumer is enabled then we need to adjust the batch size
216 // with the size from the batch consumer
217 if (isBatchConsumer()) {
218 int size = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
219 if (batchSize != size) {
220 batchSize = size;
221 if (LOG.isTraceEnabled()) {
222 LOG.trace("Using batch consumer completion, so setting batch size to: " + batchSize);
223 }
224 }
225 }
226
227 sender.enqueueExchange(exchange);
228 }
229
230 /**
231 * Sender thread for queued-up exchanges.
232 */
233 private class BatchSender extends Thread {
234
235 private Queue<Exchange> queue;
236 private Lock queueLock = new ReentrantLock();
237 private boolean exchangeEnqueued;
238 private Condition exchangeEnqueuedCondition = queueLock.newCondition();
239
240 public BatchSender() {
241 super(ExecutorServiceHelper.getThreadName("Batch Sender"));
242 this.queue = new LinkedList<Exchange>();
243 }
244
245 @Override
246 public void run() {
247 // Wait until one of either:
248 // * an exchange being queued;
249 // * the batch timeout expiring; or
250 // * the thread being cancelled.
251 //
252 // If an exchange is queued then we need to determine whether the
253 // batch is complete. If it is complete then we send out the batched
254 // exchanges. Otherwise we move back into our wait state.
255 //
256 // If the batch times out then we send out the batched exchanges
257 // collected so far.
258 //
259 // If we receive an interrupt then all blocking operations are
260 // interrupted and our thread terminates.
261 //
262 // The goal of the following algorithm in terms of synchronisation
263 // is to provide fine grained locking i.e. retaining the lock only
264 // when required. Special consideration is given to releasing the
265 // lock when calling an overloaded method i.e. sendExchanges.
266 // Unlocking is important as the process of sending out the exchanges
267 // would otherwise block new exchanges from being queued.
268
269 queueLock.lock();
270 try {
271 do {
272 try {
273 if (!exchangeEnqueued) {
274 exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
275 }
276
277 if (!exchangeEnqueued) {
278 drainQueueTo(collection, batchSize);
279 } else {
280 exchangeEnqueued = false;
281 while (isInBatchCompleted(queue.size())) {
282 drainQueueTo(collection, batchSize);
283 }
284
285 if (!isOutBatchCompleted()) {
286 continue;
287 }
288 }
289
290 queueLock.unlock();
291 try {
292 try {
293 sendExchanges();
294 } catch (Exception e) {
295 getExceptionHandler().handleException(e);
296 }
297 } finally {
298 queueLock.lock();
299 }
300
301 } catch (InterruptedException e) {
302 break;
303 }
304
305 } while (isRunAllowed());
306
307 } finally {
308 queueLock.unlock();
309 }
310 }
311
312 /**
313 * This method should be called with queueLock held
314 */
315 private void drainQueueTo(Collection<Exchange> collection, int batchSize) {
316 for (int i = 0; i < batchSize; ++i) {
317 Exchange e = queue.poll();
318 if (e != null) {
319 collection.add(e);
320 } else {
321 break;
322 }
323 }
324 }
325
326 public void cancel() {
327 interrupt();
328 }
329
330 public void enqueueExchange(Exchange exchange) {
331 queueLock.lock();
332 try {
333 queue.add(exchange);
334 exchangeEnqueued = true;
335 exchangeEnqueuedCondition.signal();
336 } finally {
337 queueLock.unlock();
338 }
339 }
340
341 @SuppressWarnings("unchecked")
342 private void sendExchanges() throws Exception {
343 Exchange grouped = null;
344
345 Iterator<Exchange> iter = collection.iterator();
346 while (iter.hasNext()) {
347 Exchange exchange = iter.next();
348 iter.remove();
349 if (!groupExchanges) {
350 // non grouped so process the exchange one at a time
351 processExchange(exchange);
352 } else {
353 // grouped so add all exchanges into one group
354 if (grouped == null) {
355 grouped = new DefaultExchange(exchange);
356 }
357 List<Exchange> list = grouped.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
358 if (list == null) {
359 list = new ArrayList<Exchange>();
360 grouped.setProperty(Exchange.GROUPED_EXCHANGE, list);
361 }
362 list.add(exchange);
363 }
364 }
365
366 // and after adding process the single grouped exchange
367 if (grouped != null) {
368 processExchange(grouped);
369 }
370 }
371 }
372
373 }