001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.ArrayList;
020 import java.util.Collection;
021 import java.util.List;
022 import java.util.concurrent.Callable;
023 import java.util.concurrent.CompletionService;
024 import java.util.concurrent.ExecutionException;
025 import java.util.concurrent.ExecutorCompletionService;
026 import java.util.concurrent.ExecutorService;
027 import java.util.concurrent.Future;
028 import java.util.concurrent.TimeUnit;
029
030 import org.apache.camel.Exchange;
031 import org.apache.camel.Navigate;
032 import org.apache.camel.Processor;
033 import org.apache.camel.impl.ServiceSupport;
034 import org.apache.camel.processor.aggregate.AggregationStrategy;
035 import org.apache.camel.util.ExchangeHelper;
036 import org.apache.camel.util.ServiceHelper;
037 import org.apache.camel.util.concurrent.AtomicExchange;
038 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
039 import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042
043 import static org.apache.camel.util.ObjectHelper.notNull;
044
045 /**
046 * Implements the Multicast pattern to send a message exchange to a number of
047 * endpoints, each endpoint receiving a copy of the message exchange.
048 *
049 * @see Pipeline
050 * @version $Revision: 789593 $
051 */
052 public class MulticastProcessor extends ServiceSupport implements Processor, Navigate, Traceable {
053
054 private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
055
056 // TODO: Add option to stop if an exception was thrown during processing to break asap (future task cancel)
057
058 /**
059 * Class that represent each step in the multicast route to do
060 */
061 static class ProcessorExchangePair {
062 private final Processor processor;
063 private final Exchange exchange;
064
065 public ProcessorExchangePair(Processor processor, Exchange exchange) {
066 this.processor = processor;
067 this.exchange = exchange;
068 }
069
070 public Processor getProcessor() {
071 return processor;
072 }
073
074 public Exchange getExchange() {
075 return exchange;
076 }
077 }
078
079 private final Collection<Processor> processors;
080 private final AggregationStrategy aggregationStrategy;
081 private final boolean isParallelProcessing;
082 private final boolean streaming;
083 private ExecutorService executorService;
084
085 public MulticastProcessor(Collection<Processor> processors) {
086 this(processors, null);
087 }
088
089 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
090 this(processors, aggregationStrategy, false, null, false);
091 }
092
093 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean streaming) {
094 notNull(processors, "processors");
095 this.processors = processors;
096 this.aggregationStrategy = aggregationStrategy;
097 this.isParallelProcessing = parallelProcessing;
098 this.executorService = executorService;
099 this.streaming = streaming;
100
101 if (isParallelProcessing()) {
102 if (this.executorService == null) {
103 // setup default executor as parallel processing requires an executor
104 this.executorService = ExecutorServiceHelper.newScheduledThreadPool(5, "Multicast", true);
105 }
106 }
107 }
108
109 @Override
110 public String toString() {
111 return "Multicast[" + getProcessors() + "]";
112 }
113
114 public String getTraceLabel() {
115 return "Multicast";
116 }
117
118 public void process(Exchange exchange) throws Exception {
119 final AtomicExchange result = new AtomicExchange();
120 final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
121
122 if (isParallelProcessing()) {
123 doProcessParallel(result, pairs, isStreaming());
124 } else {
125 doProcessSequntiel(result, pairs);
126 }
127
128 if (result.get() != null) {
129 ExchangeHelper.copyResults(exchange, result.get());
130 }
131 }
132
133 protected void doProcessParallel(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs, boolean streaming) throws InterruptedException, ExecutionException {
134 CompletionService<Exchange> completion;
135 if (streaming) {
136 // execute tasks in paralle+streaming and aggregate in the order they are finished (out of order sequence)
137 completion = new ExecutorCompletionService<Exchange>(executorService);
138 } else {
139 // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
140 completion = new SubmitOrderedCompletionService<Exchange>(executorService);
141 }
142 int total = 0;
143
144 for (ProcessorExchangePair pair : pairs) {
145 final Processor producer = pair.getProcessor();
146 final Exchange subExchange = pair.getExchange();
147 updateNewExchange(subExchange, total, pairs);
148
149 completion.submit(new Callable<Exchange>() {
150 public Exchange call() throws Exception {
151 try {
152 producer.process(subExchange);
153 } catch (Exception e) {
154 subExchange.setException(e);
155 }
156 if (LOG.isTraceEnabled()) {
157 LOG.trace("Parallel processing complete for exchange: " + subExchange);
158 }
159 return subExchange;
160 }
161 });
162
163 total++;
164 }
165
166 for (int i = 0; i < total; i++) {
167 Future<Exchange> future = completion.take();
168 Exchange subExchange = future.get();
169 if (aggregationStrategy != null) {
170 doAggregate(result, subExchange);
171 }
172 }
173
174 if (LOG.isDebugEnabled()) {
175 LOG.debug("Done parallel processing " + total + " exchanges");
176 }
177 }
178
179 protected void doProcessSequntiel(AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws Exception {
180 int total = 0;
181
182 for (ProcessorExchangePair pair : pairs) {
183 Processor producer = pair.getProcessor();
184 Exchange subExchange = pair.getExchange();
185 updateNewExchange(subExchange, total, pairs);
186
187 // process it sequentially
188 producer.process(subExchange);
189 if (LOG.isTraceEnabled()) {
190 LOG.trace("Sequientel processing complete for number " + total + " exchange: " + subExchange);
191 }
192
193 if (aggregationStrategy != null) {
194 doAggregate(result, subExchange);
195 }
196 total++;
197 }
198
199 if (LOG.isDebugEnabled()) {
200 LOG.debug("Done sequientel processing " + total + " exchanges");
201 }
202 }
203
204 /**
205 * Aggregate the {@link Exchange} with the current result
206 *
207 * @param result the current result
208 * @param exchange the exchange to be added to the result
209 */
210 protected synchronized void doAggregate(AtomicExchange result, Exchange exchange) {
211 // only aggregate if the exchange is not filtered (eg by the FilterProcessor)
212 Boolean filtered = exchange.getProperty(Exchange.FILTERED, Boolean.class);
213 if (aggregationStrategy != null && (filtered == null || !filtered)) {
214 // prepare the exchanges for aggregation
215 Exchange oldExchange = result.get();
216 ExchangeHelper.prepareAggregation(oldExchange, exchange);
217 result.set(aggregationStrategy.aggregate(oldExchange, exchange));
218 } else {
219 if (LOG.isTraceEnabled()) {
220 LOG.trace("Cannot aggregate exchange as its filtered: " + exchange);
221 }
222 }
223 }
224
225 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs) {
226 exchange.setProperty(Exchange.MULTICAST_INDEX, index);
227 }
228
229 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) {
230 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
231
232 for (Processor processor : processors) {
233 Exchange copy = exchange.copy();
234 result.add(new ProcessorExchangePair(processor, copy));
235 }
236 return result;
237 }
238
239 protected void doStop() throws Exception {
240 if (executorService != null) {
241 executorService.shutdown();
242 executorService.awaitTermination(0, TimeUnit.SECONDS);
243 }
244 ServiceHelper.stopServices(processors);
245 }
246
247 protected void doStart() throws Exception {
248 ServiceHelper.startServices(processors);
249 }
250
251 /**
252 * Is the multicast processor working in streaming mode?
253 *
254 * In streaming mode:
255 * <ul>
256 * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
257 * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
258 * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
259 * </ul>
260 */
261 public boolean isStreaming() {
262 return streaming;
263 }
264
265 /**
266 * Returns the producers to multicast to
267 */
268 public Collection<Processor> getProcessors() {
269 return processors;
270 }
271
272 public AggregationStrategy getAggregationStrategy() {
273 return aggregationStrategy;
274 }
275
276 public boolean isParallelProcessing() {
277 return isParallelProcessing;
278 }
279
280 public ExecutorService getExecutorService() {
281 return executorService;
282 }
283
284 public void setExecutorService(ExecutorService executorService) {
285 this.executorService = executorService;
286 }
287
288 public List<Processor> next() {
289 if (!hasNext()) {
290 return null;
291 }
292 return new ArrayList<Processor>(processors);
293 }
294
295 public boolean hasNext() {
296 return processors != null && !processors.isEmpty();
297 }
298 }