001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.Collection;
020
021 import javax.xml.bind.annotation.XmlAccessType;
022 import javax.xml.bind.annotation.XmlAccessorType;
023 import javax.xml.bind.annotation.XmlAttribute;
024 import javax.xml.bind.annotation.XmlElement;
025 import javax.xml.bind.annotation.XmlRootElement;
026 import javax.xml.bind.annotation.XmlTransient;
027
028 import org.apache.camel.Endpoint;
029 import org.apache.camel.Exchange;
030 import org.apache.camel.Expression;
031 import org.apache.camel.Predicate;
032 import org.apache.camel.Processor;
033 import org.apache.camel.Route;
034 import org.apache.camel.builder.ExpressionClause;
035 import org.apache.camel.model.language.ExpressionType;
036 import org.apache.camel.processor.Aggregator;
037 import org.apache.camel.processor.aggregate.AggregationCollection;
038 import org.apache.camel.processor.aggregate.AggregationStrategy;
039 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
040 import org.apache.camel.spi.RouteContext;
041
042 /**
043 * Represents an XML <aggregator/> element
044 *
045 * @version $Revision: 673335 $
046 */
047 @XmlRootElement(name = "aggregator")
048 @XmlAccessorType(XmlAccessType.FIELD)
049 public class AggregatorType extends ExpressionNode {
050 @XmlTransient
051 private AggregationStrategy aggregationStrategy;
052 @XmlTransient
053 private AggregationCollection aggregationCollection;
054 @XmlAttribute(required = false)
055 private Integer batchSize;
056 @XmlAttribute(required = false)
057 private Long batchTimeout;
058 @XmlAttribute(required = false)
059 private String strategyRef;
060 @XmlElement(name = "completedPredicate", required = false)
061 private CompletedPredicate completedPredicate;
062
063 public AggregatorType() {
064 }
065
066 public AggregatorType(Expression correlationExpression) {
067 super(correlationExpression);
068 }
069
070 public AggregatorType(ExpressionType correlationExpression) {
071 super(correlationExpression);
072 }
073
074 public AggregatorType(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
075 super(correlationExpression);
076 this.aggregationStrategy = aggregationStrategy;
077 }
078
079 @Override
080 public String toString() {
081 return "Aggregator[ " + getExpression() + " -> " + getOutputs() + "]";
082 }
083
084 @Override
085 public String getShortName() {
086 return "aggregator";
087 }
088
089 @SuppressWarnings("unchecked")
090 @Override
091 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
092 final Aggregator aggregator = createAggregator(routeContext);
093 doAddRoute(routeContext, routes, aggregator);
094 }
095
096 private void doAddRoute(RouteContext routeContext, Collection<Route> routes, final Aggregator aggregator)
097 throws Exception {
098 Route route = new Route<Exchange>(aggregator.getEndpoint(), aggregator) {
099 @Override
100 public String toString() {
101 return "AggregatorRoute[" + getEndpoint() + " -> " + aggregator.getProcessor() + "]";
102 }
103 };
104
105 routes.add(route);
106 }
107
108 @Override
109 public Processor createProcessor(RouteContext routeContext) throws Exception {
110 final Aggregator aggregator = createAggregator(routeContext);
111
112 doAddRoute(routeContext, routeContext.getCamelContext().getRoutes(), aggregator);
113 routeContext.setIsRouteAdded(true);
114 return aggregator;
115 }
116
117 protected Aggregator createAggregator(RouteContext routeContext) throws Exception {
118 Endpoint from = routeContext.getEndpoint();
119 final Processor processor = routeContext.createProcessor(this);
120
121 final Aggregator aggregator;
122 if (aggregationCollection != null) {
123 aggregator = new Aggregator(from, processor, aggregationCollection);
124 } else {
125 AggregationStrategy strategy = getAggregationStrategy();
126 if (strategy == null && strategyRef != null) {
127 strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
128 }
129 if (strategy == null) {
130 strategy = new UseLatestAggregationStrategy();
131 }
132 Expression aggregateExpression = getExpression().createExpression(routeContext);
133
134 Predicate predicate = null;
135 if (completedPredicate != null) {
136 predicate = completedPredicate.createPredicate(routeContext);
137 }
138 if (predicate != null) {
139 aggregator = new Aggregator(from, processor, aggregateExpression, strategy, predicate);
140 } else {
141 aggregator = new Aggregator(from, processor, aggregateExpression, strategy);
142 }
143 }
144
145 if (batchSize != null) {
146 aggregator.setBatchSize(batchSize);
147 }
148
149 if (batchTimeout != null) {
150 aggregator.setBatchTimeout(batchTimeout);
151 }
152
153 return aggregator;
154 }
155 public AggregationCollection getAggregationCollection() {
156 return aggregationCollection;
157 }
158
159 public void setAggregationCollection(AggregationCollection aggregationCollection) {
160 this.aggregationCollection = aggregationCollection;
161 }
162
163 public AggregationStrategy getAggregationStrategy() {
164 return aggregationStrategy;
165 }
166
167 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
168 this.aggregationStrategy = aggregationStrategy;
169 }
170
171 public Integer getBatchSize() {
172 return batchSize;
173 }
174
175 public void setBatchSize(Integer batchSize) {
176 this.batchSize = batchSize;
177 }
178
179 public Long getBatchTimeout() {
180 return batchTimeout;
181 }
182
183 public void setBatchTimeout(Long batchTimeout) {
184 this.batchTimeout = batchTimeout;
185 }
186
187 public String getStrategyRef() {
188 return strategyRef;
189 }
190
191 public void setStrategyRef(String strategyRef) {
192 this.strategyRef = strategyRef;
193 }
194
195 public CompletedPredicate getCompletePredicate() {
196 return completedPredicate;
197 }
198
199 public void setCompletePredicate(CompletedPredicate completedPredicate) {
200 this.completedPredicate = completedPredicate;
201 }
202
203 // Fluent API
204 //-------------------------------------------------------------------------
205 public AggregatorType batchSize(int batchSize) {
206 setBatchSize(batchSize);
207 return this;
208 }
209
210 public AggregatorType batchTimeout(long batchTimeout) {
211 setBatchTimeout(batchTimeout);
212 return this;
213 }
214
215 /**
216 * Sets the predicate used to determine if the aggregation is completed
217 *
218 * @return the clause used to create the predicate
219 */
220 public ExpressionClause<AggregatorType> completedPredicate() {
221 checkNoCompletedPredicate();
222 ExpressionClause<AggregatorType> clause = new ExpressionClause<AggregatorType>(this);
223 completedPredicate = new CompletedPredicate(clause);
224 return clause;
225 }
226
227 /**
228 * Sets the predicate used to determine if the aggregation is completed
229 */
230 public AggregatorType completedPredicate(Predicate predicate) {
231 checkNoCompletedPredicate();
232 completedPredicate = new CompletedPredicate(predicate);
233 return this;
234 }
235
236 protected void checkNoCompletedPredicate() {
237 if (completedPredicate != null) {
238 throw new IllegalArgumentException("There already is a completedPredicate defined for this aggregator: " + this);
239 }
240 }
241 }