001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.concurrent.ExecutorService;
020 import java.util.concurrent.Executors;
021 import javax.xml.bind.annotation.XmlAccessType;
022 import javax.xml.bind.annotation.XmlAccessorType;
023 import javax.xml.bind.annotation.XmlAttribute;
024 import javax.xml.bind.annotation.XmlRootElement;
025 import javax.xml.bind.annotation.XmlTransient;
026
027 import org.apache.camel.Expression;
028 import org.apache.camel.Processor;
029 import org.apache.camel.builder.ExpressionClause;
030 import org.apache.camel.model.language.ExpressionDefinition;
031 import org.apache.camel.processor.Splitter;
032 import org.apache.camel.processor.aggregate.AggregationStrategy;
033 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
034 import org.apache.camel.spi.RouteContext;
035
036 /**
037 * Represents an XML <split/> element
038 *
039 * @version $Revision: 772172 $
040 */
041 @XmlRootElement(name = "split")
042 @XmlAccessorType(XmlAccessType.FIELD)
043 public class SplitDefinition extends ExpressionNode {
044 @XmlTransient
045 private AggregationStrategy aggregationStrategy;
046 @XmlTransient
047 private ExecutorService executorService;
048 @XmlAttribute(required = false)
049 private Boolean parallelProcessing;
050 @XmlAttribute(required = false)
051 private String strategyRef;
052 @XmlAttribute(required = false)
053 private String executorServiceRef;
054 @XmlAttribute(required = false)
055 private Boolean streaming = false;
056
057 public SplitDefinition() {
058 }
059
060 public SplitDefinition(Expression expression) {
061 super(expression);
062 }
063
064 public SplitDefinition(ExpressionDefinition expression) {
065 super(expression);
066 }
067
068 @Override
069 public String toString() {
070 return "Split[" + getExpression() + " -> " + getOutputs() + "]";
071 }
072
073 @Override
074 public String getShortName() {
075 return "split";
076 }
077
078 @Override
079 public Processor createProcessor(RouteContext routeContext) throws Exception {
080 Processor childProcessor = routeContext.createProcessor(this);
081 aggregationStrategy = createAggregationStrategy(routeContext);
082 executorService = createExecutorService(routeContext);
083 return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
084 isParallelProcessing(), executorService, streaming);
085 }
086
087
088 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
089 AggregationStrategy strategy = getAggregationStrategy();
090 if (strategy == null && strategyRef != null) {
091 strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
092 }
093 if (strategy == null) {
094 // fallback to use latest
095 strategy = new UseLatestAggregationStrategy();
096 }
097 return strategy;
098 }
099
100 private ExecutorService createExecutorService(RouteContext routeContext) {
101 if (executorServiceRef != null) {
102 executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
103 }
104 if (executorService == null) {
105 // fall back and use default
106 executorService = Executors.newScheduledThreadPool(5);
107 }
108 return executorService;
109 }
110
111 // Fluent API
112 // -------------------------------------------------------------------------
113
114 /**
115 * Set the expression that the splitter will use
116 *
117 * @return the builder
118 */
119 public ExpressionClause<SplitDefinition> expression() {
120 return ExpressionClause.createAndSetExpression(this);
121 }
122 /**
123 * Set the aggregationStrategy
124 *
125 * @return the builder
126 */
127 public SplitDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
128 setAggregationStrategy(aggregationStrategy);
129 return this;
130 }
131
132 /**
133 * Doing the splitting work in parallel
134 *
135 * @return the builder
136 */
137 public SplitDefinition parallelProcessing() {
138 setParallelProcessing(true);
139 return this;
140 }
141
142 /**
143 * Set the splitting action's thread model
144 *
145 * @param parallelProcessing <tt>true</tt> to use a thread pool, if <tt>false</tt> then work is done in the
146 * calling thread.
147 *
148 * @return the builder
149 */
150 public SplitDefinition parallelProcessing(boolean parallelProcessing) {
151 setParallelProcessing(parallelProcessing);
152 return this;
153 }
154
155 /**
156 * Enables streaming.
157 * See {@link SplitDefinition#setStreaming(boolean)} for more information
158 *
159 * @return the builder
160 */
161 public SplitDefinition streaming() {
162 setStreaming(true);
163 return this;
164 }
165
166 /**
167 * Setting the executor service for executing the splitting action.
168 *
169 * @param executorService the executor service
170 * @return the builder
171 */
172 public SplitDefinition executorService(ExecutorService executorService) {
173 setExecutorService(executorService);
174 return this;
175 }
176
177 public AggregationStrategy getAggregationStrategy() {
178 return aggregationStrategy;
179 }
180
181 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
182 this.aggregationStrategy = aggregationStrategy;
183 }
184
185 public boolean isParallelProcessing() {
186 return parallelProcessing != null ? parallelProcessing : false;
187 }
188
189 public void setParallelProcessing(boolean parallelProcessing) {
190 this.parallelProcessing = parallelProcessing;
191 }
192
193 /**
194 * The splitter should use streaming -- exchanges are being sent as the data for them becomes available.
195 * This improves throughput and memory usage, but it has a drawback:
196 * - the sent exchanges will no longer contain the {@link org.apache.camel.Exchange#SPLIT_SIZE} header property
197 *
198 * @return whether or not streaming should be used
199 */
200 public boolean isStreaming() {
201 return streaming != null ? streaming : false;
202 }
203
204 public void setStreaming(boolean streaming) {
205 this.streaming = streaming;
206 }
207
208 public ExecutorService getExecutorService() {
209 return executorService;
210 }
211
212 public void setExecutorService(ExecutorService executorService) {
213 this.executorService = executorService;
214 }
215 }