001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.ArrayList;
020 import java.util.List;
021
022 import javax.xml.bind.annotation.XmlAccessType;
023 import javax.xml.bind.annotation.XmlAccessorType;
024 import javax.xml.bind.annotation.XmlAttribute;
025 import javax.xml.bind.annotation.XmlElement;
026 import javax.xml.bind.annotation.XmlElementRef;
027 import javax.xml.bind.annotation.XmlRootElement;
028 import javax.xml.bind.annotation.XmlTransient;
029
030 import org.apache.camel.Expression;
031 import org.apache.camel.Predicate;
032 import org.apache.camel.Processor;
033 import org.apache.camel.builder.ExpressionClause;
034 import org.apache.camel.model.language.ExpressionDefinition;
035 import org.apache.camel.processor.Aggregator;
036 import org.apache.camel.processor.aggregate.AggregationCollection;
037 import org.apache.camel.processor.aggregate.AggregationStrategy;
038 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
039 import org.apache.camel.spi.RouteContext;
040
041 /**
042 * Represents an XML <aggregate/> element
043 *
044 * @version $Revision: 791088 $
045 */
046 @XmlRootElement(name = "aggregate")
047 @XmlAccessorType(XmlAccessType.FIELD)
048 public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> {
049 @XmlElement(name = "correlationExpression", required = false)
050 private ExpressionSubElementDefinition correlationExpression;
051 @XmlTransient
052 private ExpressionDefinition expression;
053 @XmlElementRef
054 private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
055 @XmlTransient
056 private AggregationStrategy aggregationStrategy;
057 @XmlTransient
058 private AggregationCollection aggregationCollection;
059 @XmlAttribute(required = false)
060 private Integer batchSize;
061 @XmlAttribute(required = false)
062 private Integer outBatchSize;
063 @XmlAttribute(required = false)
064 private Long batchTimeout;
065 @XmlAttribute(required = false)
066 private String strategyRef;
067 @XmlAttribute(required = false)
068 private String collectionRef;
069 @XmlAttribute(required = false)
070 private Boolean groupExchanges;
071 @XmlAttribute(required = false)
072 private Boolean batchSizeFromConsumer;
073 @XmlElement(name = "completionPredicate", required = false)
074 private ExpressionSubElementDefinition completionPredicate;
075
076 public AggregateDefinition() {
077 }
078
079 public AggregateDefinition(Predicate predicate) {
080 if (predicate != null) {
081 setExpression(new ExpressionDefinition(predicate));
082 }
083 }
084
085 public AggregateDefinition(Expression correlationExpression) {
086 if (correlationExpression != null) {
087 setExpression(new ExpressionDefinition(correlationExpression));
088 }
089 }
090
091 public AggregateDefinition(ExpressionDefinition correlationExpression) {
092 this.expression = correlationExpression;
093 }
094
095 public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
096 this(correlationExpression);
097 this.aggregationStrategy = aggregationStrategy;
098 }
099
100 @Override
101 public String toString() {
102 String expressionString = (getExpression() != null) ? getExpression().getLabel() : "";
103 return "Aggregate[" + expressionString + " -> " + getOutputs() + "]";
104 }
105
106 @Override
107 public String getShortName() {
108 return "aggregate";
109 }
110
111 @Override
112 public Processor createProcessor(RouteContext routeContext) throws Exception {
113 return createAggregator(routeContext);
114 }
115
116 public ExpressionClause<AggregateDefinition> createAndSetExpression() {
117 ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(this);
118 this.setExpression(clause);
119 return clause;
120 }
121
122 protected Aggregator createAggregator(RouteContext routeContext) throws Exception {
123 final Processor processor = routeContext.createProcessor(this);
124
125 final Aggregator aggregator;
126 if (getAggregationCollection() == null) {
127 setAggregationCollection(createAggregationCollection(routeContext));
128 }
129
130 if (aggregationCollection != null) {
131 // create the aggregator using the collection
132 // pre configure the collection if its expression and strategy is not set, then
133 // use the ones that is pre configured with this type
134 if (aggregationCollection.getCorrelationExpression() == null) {
135 aggregationCollection.setCorrelationExpression(getExpression());
136 }
137 if (aggregationCollection.getAggregationStrategy() == null) {
138 AggregationStrategy strategy = createAggregationStrategy(routeContext);
139 aggregationCollection.setAggregationStrategy(strategy);
140 }
141 aggregator = new Aggregator(processor, aggregationCollection);
142 } else {
143 // create the aggregator using a default collection
144 AggregationStrategy strategy = createAggregationStrategy(routeContext);
145
146 if (getExpression() == null) {
147 throw new IllegalArgumentException("You need to specify an expression or "
148 + "aggregation collection for this aggregator: " + this);
149 }
150
151 Expression aggregateExpression = getExpression().createExpression(routeContext);
152
153 Predicate predicate = null;
154 if (getCompletionPredicate() != null) {
155 predicate = getCompletionPredicate().createPredicate(routeContext);
156 }
157 if (predicate != null) {
158 aggregator = new Aggregator(processor, aggregateExpression, strategy, predicate);
159 } else {
160 aggregator = new Aggregator(processor, aggregateExpression, strategy);
161 }
162 }
163
164 if (batchSize != null) {
165 aggregator.setBatchSize(batchSize);
166 }
167 if (batchTimeout != null) {
168 aggregator.setBatchTimeout(batchTimeout);
169 }
170 if (outBatchSize != null) {
171 aggregator.setOutBatchSize(outBatchSize);
172 }
173 if (groupExchanges != null) {
174 aggregator.setGroupExchanges(groupExchanges);
175 }
176 if (batchSizeFromConsumer != null) {
177 aggregator.setBatchConsumer(batchSizeFromConsumer);
178 }
179
180 return aggregator;
181 }
182
183 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
184 AggregationStrategy strategy = getAggregationStrategy();
185 if (strategy == null && strategyRef != null) {
186 strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
187 }
188 if (strategy == null) {
189 // fallback to use latest
190 strategy = new UseLatestAggregationStrategy();
191 }
192 return strategy;
193 }
194
195 private AggregationCollection createAggregationCollection(RouteContext routeContext) {
196 AggregationCollection collection = getAggregationCollection();
197 if (collection == null && collectionRef != null) {
198 collection = routeContext.lookup(collectionRef, AggregationCollection.class);
199 }
200 return collection;
201 }
202
203 public AggregationCollection getAggregationCollection() {
204 return aggregationCollection;
205 }
206
207 public void setAggregationCollection(AggregationCollection aggregationCollection) {
208 this.aggregationCollection = aggregationCollection;
209 }
210
211 public AggregationStrategy getAggregationStrategy() {
212 return aggregationStrategy;
213 }
214
215 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
216 this.aggregationStrategy = aggregationStrategy;
217 }
218
219 public Integer getBatchSize() {
220 return batchSize;
221 }
222
223 public void setBatchSize(Integer batchSize) {
224 this.batchSize = batchSize;
225 }
226
227 public Integer getOutBatchSize() {
228 return outBatchSize;
229 }
230
231 public void setOutBatchSize(Integer outBatchSize) {
232 this.outBatchSize = outBatchSize;
233 }
234
235 public Long getBatchTimeout() {
236 return batchTimeout;
237 }
238
239 public void setBatchTimeout(Long batchTimeout) {
240 this.batchTimeout = batchTimeout;
241 }
242
243 public String getStrategyRef() {
244 return strategyRef;
245 }
246
247 public void setStrategyRef(String strategyRef) {
248 this.strategyRef = strategyRef;
249 }
250
251 public String getCollectionRef() {
252 return collectionRef;
253 }
254
255 public void setCollectionRef(String collectionRef) {
256 this.collectionRef = collectionRef;
257 }
258
259 public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
260 this.completionPredicate = completionPredicate;
261 }
262
263 public ExpressionSubElementDefinition getCompletionPredicate() {
264 return completionPredicate;
265 }
266
267 public Boolean getGroupExchanges() {
268 return groupExchanges;
269 }
270
271 public void setGroupExchanges(Boolean groupExchanges) {
272 this.groupExchanges = groupExchanges;
273 }
274
275 public Boolean getBatchSizeFromConsumer() {
276 return batchSizeFromConsumer;
277 }
278
279 public void setBatchSizeFromConsumer(Boolean batchSizeFromConsumer) {
280 this.batchSizeFromConsumer = batchSizeFromConsumer;
281 }
282
283 // Fluent API
284 //-------------------------------------------------------------------------
285
286 /**
287 * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
288 * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
289 * as total by setting the exchange property {@link org.apache.camel.Exchange#BATCH_SIZE}.
290 *
291 * @return builder
292 */
293 public AggregateDefinition batchSizeFromConsumer() {
294 setBatchSizeFromConsumer(true);
295 return this;
296 }
297
298 /**
299 * Sets the in batch size for number of exchanges received
300 *
301 * @param batchSize the batch size
302 * @return builder
303 */
304 public AggregateDefinition batchSize(int batchSize) {
305 setBatchSize(batchSize);
306 return this;
307 }
308
309 /**
310 * Sets the out batch size for number of exchanges sent
311 *
312 * @param batchSize the batch size
313 * @return builder
314 */
315 public AggregateDefinition outBatchSize(int batchSize) {
316 setOutBatchSize(batchSize);
317 return this;
318 }
319
320 /**
321 * Sets the batch timeout
322 *
323 * @param batchTimeout the timeout in millis
324 * @return the builder
325 */
326 public AggregateDefinition batchTimeout(long batchTimeout) {
327 setBatchTimeout(batchTimeout);
328 return this;
329 }
330
331 /**
332 * Sets the aggregate collection to use
333 *
334 * @param aggregationCollection the aggregate collection to use
335 * @return the builder
336 */
337 public AggregateDefinition aggregationCollection(AggregationCollection aggregationCollection) {
338 setAggregationCollection(aggregationCollection);
339 return this;
340 }
341
342 /**
343 * Sets the aggregate strategy to use
344 *
345 * @param aggregationStrategy the aggregate strategy to use
346 * @return the builder
347 */
348 public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
349 setAggregationStrategy(aggregationStrategy);
350 return this;
351 }
352
353 /**
354 * Sets the aggregate collection to use
355 *
356 * @param collectionRef reference to the aggregate collection to lookup in the registry
357 * @return the builder
358 */
359 public AggregateDefinition collectionRef(String collectionRef) {
360 setCollectionRef(collectionRef);
361 return this;
362 }
363
364 /**
365 * Sets the aggregate strategy to use
366 *
367 * @param strategyRef reference to the strategy to lookup in the registry
368 * @return the builder
369 */
370 public AggregateDefinition strategyRef(String strategyRef) {
371 setStrategyRef(strategyRef);
372 return this;
373 }
374
375 /**
376 * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
377 * combined Exchange holding all the aggregated exchanges in a {@link java.util.List} as a exchange
378 * property with the key {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}.
379 *
380 * @return the builder
381 */
382 public AggregateDefinition groupExchanges() {
383 setGroupExchanges(true);
384 return this;
385 }
386
387 /**
388 * Sets the predicate used to determine if the aggregation is completed
389 *
390 * @return the clause used to create the predicate
391 */
392 public ExpressionClause<AggregateDefinition> completionPredicate() {
393 checkNoCompletedPredicate();
394 ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(this);
395 setCompletionPredicate(new ExpressionSubElementDefinition((Expression)clause));
396 return clause;
397 }
398
399 /**
400 * Sets the predicate used to determine if the aggregation is completed
401 *
402 * @param predicate the predicate
403 */
404 public AggregateDefinition completionPredicate(Predicate predicate) {
405 checkNoCompletedPredicate();
406 setCompletionPredicate(new ExpressionSubElementDefinition(predicate));
407 return this;
408 }
409
410 protected void checkNoCompletedPredicate() {
411 if (getCompletionPredicate() != null) {
412 throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);
413 }
414 }
415
416 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
417 this.correlationExpression = correlationExpression;
418 }
419
420 public ExpressionSubElementDefinition getCorrelationExpression() {
421 return correlationExpression;
422 }
423
424 // Section - Methods from ExpressionNode
425 // Needed to copy methods from ExpressionNode here so that I could specify the
426 // correlation expression as optional in JAXB
427
428 public ExpressionDefinition getExpression() {
429 if (expression == null && correlationExpression != null) {
430 expression = correlationExpression.getExpressionType();
431 }
432 return expression;
433 }
434
435 public void setExpression(ExpressionDefinition expression) {
436 this.expression = expression;
437 }
438
439 public List<ProcessorDefinition> getOutputs() {
440 return outputs;
441 }
442
443 public void setOutputs(List<ProcessorDefinition> outputs) {
444 this.outputs = outputs;
445 }
446
447 }