001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019
020
021 import java.util.ArrayList;
022 import java.util.Collection;
023 import java.util.List;
024 import java.util.concurrent.ArrayBlockingQueue;
025 import java.util.concurrent.CountDownLatch;
026 import java.util.concurrent.RejectedExecutionException;
027 import java.util.concurrent.RejectedExecutionHandler;
028 import java.util.concurrent.ThreadPoolExecutor;
029 import java.util.concurrent.TimeUnit;
030 import java.util.concurrent.atomic.AtomicBoolean;
031
032 import org.apache.camel.AsyncCallback;
033 import org.apache.camel.Endpoint;
034 import org.apache.camel.Exchange;
035 import org.apache.camel.Processor;
036 import org.apache.camel.impl.ServiceSupport;
037 import org.apache.camel.processor.aggregate.AggregationStrategy;
038 import org.apache.camel.util.ExchangeHelper;
039 import org.apache.camel.util.ServiceHelper;
040 import static org.apache.camel.util.ObjectHelper.notNull;
041
042 /**
043 * Implements the Multicast pattern to send a message exchange to a number of
044 * endpoints, each endpoint receiving a copy of the message exchange.
045 *
046 * @see Pipeline
047 * @version $Revision: 662940 $
048 */
049 public class MulticastProcessor extends ServiceSupport implements Processor {
050 static class ProcessorExchangePair {
051 private final Processor processor;
052 private final Exchange exchange;
053
054 public ProcessorExchangePair(Processor processor, Exchange exchange) {
055 this.processor = processor;
056 this.exchange = exchange;
057 }
058
059 public Processor getProcessor() {
060 return processor;
061 }
062
063 public Exchange getExchange() {
064 return exchange;
065 }
066
067
068 }
069
070 private Collection<Processor> processors;
071 private AggregationStrategy aggregationStrategy;
072 private boolean isParallelProcessing;
073 private ThreadPoolExecutor executor;
074 private final AtomicBoolean shutdown = new AtomicBoolean(true);
075
076 public MulticastProcessor(Collection<Processor> processors) {
077 this(processors, null);
078 }
079
080 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
081 this(processors, aggregationStrategy, false, null);
082 }
083
084 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) {
085 notNull(processors, "processors");
086 this.processors = processors;
087 this.aggregationStrategy = aggregationStrategy;
088 this.isParallelProcessing = parallelProcessing;
089 if (isParallelProcessing) {
090 if (executor != null) {
091 this.executor = executor;
092 } else { // setup default Executor
093 this.executor = new ThreadPoolExecutor(processors.size(), processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size()));
094 }
095
096 }
097
098 }
099
100 /**
101 * A helper method to convert a list of endpoints into a list of processors
102 */
103 public static <E extends Exchange> Collection<Processor> toProducers(Collection<Endpoint> endpoints)
104 throws Exception {
105 Collection<Processor> answer = new ArrayList<Processor>();
106 for (Endpoint endpoint : endpoints) {
107 answer.add(endpoint.createProducer());
108 }
109 return answer;
110 }
111
112 @Override
113 public String toString() {
114 return "Multicast" + getProcessors();
115 }
116
117 class ProcessCall implements Runnable {
118 private final Exchange exchange;
119 private final AsyncCallback callback;
120 private final Processor processor;
121
122 public ProcessCall(Exchange exchange, Processor processor, AsyncCallback callback) {
123 this.exchange = exchange;
124 this.callback = callback;
125 this.processor = processor;
126 }
127
128 public void run() {
129 if (shutdown.get()) {
130 exchange.setException(new RejectedExecutionException());
131 callback.done(false);
132 } else {
133 try {
134 processor.process(exchange);
135 } catch (Exception ex) {
136 exchange.setException(ex);
137 }
138 callback.done(false);
139 }
140 }
141 }
142
143 public void process(Exchange exchange) throws Exception {
144 Exchange result = null;
145
146 List<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
147
148 // Parallel Processing the producer
149 if (isParallelProcessing) {
150 Exchange[] exchanges = new Exchange[pairs.size()];
151 final CountDownLatch completedExchanges = new CountDownLatch(pairs.size());
152 int i = 0;
153 for (ProcessorExchangePair pair : pairs) {
154 Processor producer = pair.getProcessor();
155 exchanges[i] = pair.getExchange();
156 updateNewExchange(exchanges[i], i, pairs);
157 ProcessCall call = new ProcessCall(exchanges[i], producer, new AsyncCallback() {
158 public void done(boolean doneSynchronously) {
159 completedExchanges.countDown();
160 }
161
162 });
163 executor.execute(call);
164 i++;
165 }
166 completedExchanges.await();
167 if (aggregationStrategy != null) {
168 for (Exchange resultExchange : exchanges) {
169 if (result == null) {
170 result = resultExchange;
171 } else {
172 result = aggregationStrategy.aggregate(result, resultExchange);
173 }
174 }
175 }
176
177 } else {
178 // we call the producer one by one sequentially
179 int i = 0;
180 for (ProcessorExchangePair pair : pairs) {
181 Processor producer = pair.getProcessor();
182 Exchange subExchange = pair.getExchange();
183 updateNewExchange(subExchange, i, pairs);
184
185 producer.process(subExchange);
186 if (aggregationStrategy != null) {
187 if (result == null) {
188 result = subExchange;
189 } else {
190 result = aggregationStrategy.aggregate(result, subExchange);
191 }
192 }
193 i++;
194 }
195 }
196 if (result != null) {
197 ExchangeHelper.copyResults(exchange, result);
198 }
199 }
200
201 protected void updateNewExchange(Exchange exchange, int i, List<ProcessorExchangePair> allPairs) {
202 // No updates needed
203 }
204
205 protected List<ProcessorExchangePair> createProcessorExchangePairs(
206 Exchange exchange) {
207 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
208 Processor[] processorsArray = processors.toArray(new Processor[processors.size()]);
209 for (int i = 0; i < processorsArray.length; i++) {
210 result.add(new ProcessorExchangePair(processorsArray[i], exchange.copy()));
211 }
212 return result;
213 }
214
215 protected void doStop() throws Exception {
216 shutdown.set(true);
217 if (executor != null) {
218 executor.shutdown();
219 executor.awaitTermination(0, TimeUnit.SECONDS);
220 }
221 ServiceHelper.stopServices(processors);
222 }
223
224 protected void doStart() throws Exception {
225 shutdown.set(false);
226 if (executor != null) {
227 executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
228 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
229 ProcessCall call = (ProcessCall)runnable;
230 call.exchange.setException(new RejectedExecutionException());
231 call.callback.done(false);
232 }
233 });
234 }
235 ServiceHelper.startServices(processors);
236 }
237
238 /**
239 * Returns the producers to multicast to
240 */
241 public Collection<Processor> getProcessors() {
242 return processors;
243 }
244
245 public AggregationStrategy getAggregationStrategy() {
246 return aggregationStrategy;
247 }
248 }