001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.Collection;
020 import java.util.Iterator;
021
022 import org.apache.camel.Endpoint;
023 import org.apache.camel.Exchange;
024 import org.apache.camel.PollingConsumer;
025 import org.apache.camel.Processor;
026 import org.apache.camel.impl.LoggingExceptionHandler;
027 import org.apache.camel.impl.ServiceSupport;
028 import org.apache.camel.spi.ExceptionHandler;
029 import org.apache.camel.util.ServiceHelper;
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032
033 /**
034 * A base class for any kind of {@link Processor} which implements some kind of
035 * batch processing.
036 *
037 * @version $Revision: 669756 $
038 */
039 public class BatchProcessor extends ServiceSupport implements Runnable, Processor {
040 public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
041 public static final int DEFAULT_BATCH_SIZE = 100;
042
043 private static final transient Log LOG = LogFactory.getLog(BatchProcessor.class);
044 private Endpoint endpoint;
045 private Processor processor;
046 private Collection<Exchange> collection;
047 private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
048 private int batchSize = DEFAULT_BATCH_SIZE;
049 private PollingConsumer consumer;
050 private ExceptionHandler exceptionHandler;
051
052 public BatchProcessor(Endpoint endpoint, Processor processor, Collection<Exchange> collection) {
053 this.endpoint = endpoint;
054 this.processor = processor;
055 this.collection = collection;
056 }
057
058 @Override
059 public String toString() {
060 return "BatchProcessor[to: " + processor + "]";
061 }
062
063 public void run() {
064 LOG.debug("Starting thread for " + this);
065 while (isRunAllowed()) {
066 try {
067 processBatch();
068 } catch (Exception e) {
069 getExceptionHandler().handleException(e);
070 }
071 }
072 collection.clear();
073 }
074
075 // Properties
076 // -------------------------------------------------------------------------
077 public ExceptionHandler getExceptionHandler() {
078 if (exceptionHandler == null) {
079 exceptionHandler = new LoggingExceptionHandler(getClass());
080 }
081 return exceptionHandler;
082 }
083
084 public void setExceptionHandler(ExceptionHandler exceptionHandler) {
085 this.exceptionHandler = exceptionHandler;
086 }
087
088 public int getBatchSize() {
089 return batchSize;
090 }
091
092 public void setBatchSize(int batchSize) {
093 this.batchSize = batchSize;
094 }
095
096 public long getBatchTimeout() {
097 return batchTimeout;
098 }
099
100 public void setBatchTimeout(long batchTimeout) {
101 this.batchTimeout = batchTimeout;
102 }
103
104 public Endpoint getEndpoint() {
105 return endpoint;
106 }
107
108 public Processor getProcessor() {
109 return processor;
110 }
111
112 /**
113 * A transactional method to process a batch of messages up to a timeout
114 * period or number of messages reached.
115 */
116 protected synchronized void processBatch() throws Exception {
117 long start = System.currentTimeMillis();
118 long end = start + batchTimeout;
119 for (int i = 0; !isBatchCompleted(i); i++) {
120 long timeout = end - System.currentTimeMillis();
121 if (timeout < 0L) {
122 LOG.debug("batch timeout expired at batch index:" + i);
123 break;
124 }
125 Exchange exchange = consumer.receive(timeout);
126 if (exchange == null) {
127 LOG.debug("receive with timeout: " + timeout + " expired at batch index:" + i);
128 break;
129 }
130 collection.add(exchange);
131 }
132
133 if (LOG.isDebugEnabled()) {
134 LOG.debug("Finished batch size: " + batchSize + " timeout: " + batchTimeout + " so sending set: "
135 + collection);
136 }
137
138 // lets send the batch
139 Iterator<Exchange> iter = collection.iterator();
140 while (iter.hasNext()) {
141 Exchange exchange = iter.next();
142 iter.remove();
143 processExchange(exchange);
144 }
145 }
146
147 /**
148 * A strategy method to decide if the batch is completed the resulting exchanges should be sent
149 */
150 protected boolean isBatchCompleted(int index) {
151 return index >= batchSize;
152 }
153
154 /**
155 * Strategy Method to process an exchange in the batch. This method allows
156 * derived classes to perform custom processing before or after an
157 * individual exchange is processed
158 */
159 protected void processExchange(Exchange exchange) throws Exception {
160 processor.process(exchange);
161 }
162
163 protected void doStart() throws Exception {
164 consumer = endpoint.createPollingConsumer();
165
166 ServiceHelper.startServices(processor, consumer);
167
168 Thread thread = new Thread(this, this + " Polling Thread");
169 thread.start();
170 }
171
172 protected void doStop() throws Exception {
173 ServiceHelper.stopServices(consumer, processor);
174 collection.clear();
175 }
176
177 protected Collection<Exchange> getCollection() {
178 return collection;
179 }
180
181 public void process(Exchange exchange) throws Exception {
182 // empty since exchanges come from endpoint's polling consumer
183 }
184 }