001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor.aggregate;
018
019 import java.util.AbstractCollection;
020 import java.util.Iterator;
021 import java.util.LinkedHashMap;
022 import java.util.Map;
023 import java.util.concurrent.atomic.AtomicInteger;
024
025 import org.apache.camel.Exchange;
026 import org.apache.camel.Expression;
027 import org.apache.camel.util.ExchangeHelper;
028 import org.apache.camel.util.ObjectHelper;
029 import org.apache.commons.logging.Log;
030 import org.apache.commons.logging.LogFactory;
031
032 /**
033 * A {@link java.util.Collection} which aggregates exchanges together using a correlation
034 * expression so that there is only a single message exchange sent for a single
035 * correlation key.
036 *
037 * @version $Revision: 787563 $
038 */
039 public class DefaultAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection {
040
041 private static final transient Log LOG = LogFactory.getLog(DefaultAggregationCollection.class);
042 private Expression correlationExpression;
043 private AggregationStrategy aggregationStrategy;
044 private final Map<Object, Exchange> aggregated = new LinkedHashMap<Object, Exchange>();
045 private final AtomicInteger counter = new AtomicInteger();
046
047 public DefaultAggregationCollection() {
048 }
049
050 public DefaultAggregationCollection(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
051 this.correlationExpression = correlationExpression;
052 this.aggregationStrategy = aggregationStrategy;
053 }
054
055 protected Map<Object, Exchange> getAggregated() {
056 return aggregated;
057 }
058
059 @Override
060 public boolean add(Exchange exchange) {
061 // do not add exchange if it was filtered
062 Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class);
063 if (filtered != null && filtered) {
064 if (LOG.isTraceEnabled()) {
065 LOG.trace("Cannot aggregate exchange as its filtered: " + exchange);
066 }
067 return false;
068 }
069
070 Object correlationKey = correlationExpression.evaluate(exchange, Object.class);
071 if (LOG.isTraceEnabled()) {
072 LOG.trace("Evaluated expression: " + correlationExpression + " as correlation key: " + correlationKey);
073 }
074
075 // TODO: correlationKey evalutated to null should be skipped by default
076
077 Exchange oldExchange = aggregated.get(correlationKey);
078 Exchange newExchange = exchange;
079
080 Integer size = 1;
081 if (oldExchange != null) {
082 size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class);
083 ObjectHelper.notNull(size, Exchange.AGGREGATED_SIZE + " on " + oldExchange);
084 size++;
085 }
086
087 // prepare the exchanges for aggregation
088 ExchangeHelper.prepareAggregation(oldExchange, newExchange);
089 newExchange = aggregationStrategy.aggregate(oldExchange, newExchange);
090 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
091
092 // update the index counter
093 newExchange.setProperty(Exchange.AGGREGATED_INDEX, counter.getAndIncrement());
094
095 // the strategy may just update the old exchange and return it
096 if (!newExchange.equals(oldExchange)) {
097 if (LOG.isTraceEnabled()) {
098 LOG.trace("Put exchange:" + newExchange + " with correlation key:" + correlationKey);
099 }
100 aggregated.put(correlationKey, newExchange);
101 }
102
103 onAggregation(correlationKey, newExchange);
104
105 return true;
106 }
107
108 public Iterator<Exchange> iterator() {
109 return aggregated.values().iterator();
110 }
111
112 public int size() {
113 return aggregated.size();
114 }
115
116 @Override
117 public void clear() {
118 aggregated.clear();
119 counter.set(0);
120 }
121
122 public void onAggregation(Object correlationKey, Exchange exchange) {
123 }
124
125 public Expression getCorrelationExpression() {
126 return correlationExpression;
127 }
128
129 public void setCorrelationExpression(Expression correlationExpression) {
130 this.correlationExpression = correlationExpression;
131 }
132
133 public AggregationStrategy getAggregationStrategy() {
134 return aggregationStrategy;
135 }
136
137 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
138 this.aggregationStrategy = aggregationStrategy;
139 }
140 }