001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.List;
020 import java.util.concurrent.ExecutorService;
021
022 import javax.xml.bind.annotation.XmlAccessType;
023 import javax.xml.bind.annotation.XmlAccessorType;
024 import javax.xml.bind.annotation.XmlAttribute;
025 import javax.xml.bind.annotation.XmlRootElement;
026 import javax.xml.bind.annotation.XmlTransient;
027
028 import org.apache.camel.Processor;
029 import org.apache.camel.processor.MulticastProcessor;
030 import org.apache.camel.processor.aggregate.AggregationStrategy;
031 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
032 import org.apache.camel.spi.RouteContext;
033
034 /**
035 * Represents an XML <multicast/> element
036 *
037 * @version $Revision: 772172 $
038 */
039 @XmlRootElement(name = "multicast")
040 @XmlAccessorType(XmlAccessType.FIELD)
041 public class MulticastDefinition extends OutputDefinition<ProcessorDefinition> {
042 @XmlAttribute(required = false)
043 private Boolean parallelProcessing;
044 @XmlAttribute(required = false)
045 private String strategyRef;
046 @XmlTransient
047 private ExecutorService executorService;
048 @XmlAttribute(required = false)
049 private String executorServiceRef;
050 @XmlAttribute(required = false)
051 private Boolean streaming;
052 @XmlTransient
053 private AggregationStrategy aggregationStrategy;
054
055
056 @Override
057 public String toString() {
058 return "Multicast[" + getOutputs() + "]";
059 }
060
061 @Override
062 public String getShortName() {
063 return "multicast";
064 }
065
066 @Override
067 public Processor createProcessor(RouteContext routeContext) throws Exception {
068 return createOutputsProcessor(routeContext);
069 }
070
071 // Fluent API
072 // -------------------------------------------------------------------------
073
074 /**
075 * Set the multicasting aggregationStrategy
076 *
077 * @return the builder
078 */
079 public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
080 setAggregationStrategy(aggregationStrategy);
081 return this;
082 }
083
084 /**
085 * Uses the {@link java.util.concurrent.ExecutorService} to do the multicasting work
086 *
087 * @return the builder
088 */
089 public MulticastDefinition parallelProcessing() {
090 setParallelProcessing(true);
091 return this;
092 }
093
094 /**
095 * Aggregates the responses as the are done (e.g. out of order sequence)
096 *
097 * @return the builder
098 */
099 public MulticastDefinition streaming() {
100 setStreaming(true);
101 return this;
102 }
103
104 /**
105 * Setting the executor service for executing the multicasting action.
106 *
107 * @return the builder
108 */
109 public MulticastDefinition executorService(ExecutorService executorService) {
110 setExecutorService(executorService);
111 return this;
112 }
113
114 protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) {
115 if (strategyRef != null) {
116 aggregationStrategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
117 }
118 if (aggregationStrategy == null) {
119 // default to use latest aggregation strategy
120 aggregationStrategy = new UseLatestAggregationStrategy();
121 }
122 if (executorServiceRef != null) {
123 executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
124 }
125 return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executorService, isStreaming());
126 }
127
128 public AggregationStrategy getAggregationStrategy() {
129 return aggregationStrategy;
130 }
131
132 public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) {
133 this.aggregationStrategy = aggregationStrategy;
134 return this;
135 }
136
137 public boolean isParallelProcessing() {
138 return parallelProcessing != null ? parallelProcessing : false;
139 }
140
141 public void setParallelProcessing(boolean parallelProcessing) {
142 this.parallelProcessing = parallelProcessing;
143 }
144
145 public boolean isStreaming() {
146 return streaming != null ? streaming : false;
147 }
148
149 public void setStreaming(boolean streaming) {
150 this.streaming = streaming;
151 }
152
153 public ExecutorService getExecutorService() {
154 return executorService;
155 }
156
157 public void setExecutorService(ExecutorService executorService) {
158 this.executorService = executorService;
159 }
160
161 }