001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.ArrayList;
020 import java.util.Collection;
021 import java.util.List;
022
023 import javax.xml.bind.annotation.XmlElement;
024 import javax.xml.bind.annotation.XmlElementRef;
025 import javax.xml.bind.annotation.XmlRootElement;
026 import javax.xml.bind.annotation.XmlTransient;
027
028 import org.apache.camel.Exchange;
029 import org.apache.camel.Expression;
030 import org.apache.camel.Processor;
031 import org.apache.camel.model.config.BatchResequencerConfig;
032 import org.apache.camel.model.config.StreamResequencerConfig;
033 import org.apache.camel.model.language.ExpressionType;
034 import org.apache.camel.processor.Resequencer;
035 import org.apache.camel.processor.StreamResequencer;
036 import org.apache.camel.processor.resequencer.ExpressionResultComparator;
037 import org.apache.camel.spi.RouteContext;
038
039 /**
040 * Represents an XML <resequencer/> element
041 *
042 * @version $Revision: 724629 $
043 */
044 @XmlRootElement(name = "resequencer")
045 public class ResequencerType extends ProcessorType<ProcessorType> {
046 @XmlElementRef
047 private List<ExpressionType> expressions = new ArrayList<ExpressionType>();
048 @XmlElementRef
049 private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
050 // Binding annotation at setter
051 private BatchResequencerConfig batchConfig;
052 // Binding annotation at setter
053 private StreamResequencerConfig streamConfig;
054 @XmlTransient
055 private List<Expression> expressionList;
056
057 public ResequencerType() {
058 this(null);
059 }
060
061 public ResequencerType(List<Expression> expressions) {
062 this.expressionList = expressions;
063 this.batch();
064 }
065
066 @Override
067 public String getShortName() {
068 return "resequencer";
069 }
070
071 /**
072 * Configures the stream-based resequencing algorithm using the default
073 * configuration.
074 *
075 * @return <code>this</code> instance.
076 */
077 public ResequencerType stream() {
078 return stream(StreamResequencerConfig.getDefault());
079 }
080
081 /**
082 * Configures the batch-based resequencing algorithm using the default
083 * configuration.
084 *
085 * @return <code>this</code> instance.
086 */
087 public ResequencerType batch() {
088 return batch(BatchResequencerConfig.getDefault());
089 }
090
091 /**
092 * Configures the stream-based resequencing algorithm using the given
093 * {@link StreamResequencerConfig}.
094 *
095 * @return <code>this</code> instance.
096 */
097 public ResequencerType stream(StreamResequencerConfig config) {
098 this.streamConfig = config;
099 this.batchConfig = null;
100 return this;
101 }
102
103 /**
104 * Configures the batch-based resequencing algorithm using the given
105 * {@link BatchResequencerConfig}.
106 *
107 * @return <code>this</code> instance.
108 */
109 public ResequencerType batch(BatchResequencerConfig config) {
110 this.batchConfig = config;
111 this.streamConfig = null;
112 return this;
113 }
114
115 public ResequencerType expression(ExpressionType expression) {
116 expressions.add(expression);
117 return this;
118 }
119
120 @Override
121 public String toString() {
122 return "Resequencer[" + getExpressions() + " -> " + getOutputs() + "]";
123 }
124
125 @Override
126 public String getLabel() {
127 return ExpressionType.getLabel(getExpressions());
128 }
129
130 public List<ExpressionType> getExpressions() {
131 return expressions;
132 }
133
134 public List<ProcessorType<?>> getOutputs() {
135 return outputs;
136 }
137
138 public void setOutputs(List<ProcessorType<?>> outputs) {
139 this.outputs = outputs;
140 }
141
142 public BatchResequencerConfig getBatchConfig() {
143 return batchConfig;
144 }
145
146 public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) {
147 return batchConfig;
148 }
149
150 public StreamResequencerConfig getStreamConfig() {
151 return streamConfig;
152 }
153
154 @XmlElement(name = "batch-config", required = false)
155 public void setBatchConfig(BatchResequencerConfig batchConfig) {
156 // TODO: find out how to have these two within an <xsd:choice>
157 batch(batchConfig);
158 }
159
160 @XmlElement(name = "stream-config", required = false)
161 public void setStreamConfig(StreamResequencerConfig streamConfig) {
162 // TODO: find out how to have these two within an <xsd:choice>
163 stream(streamConfig);
164 }
165
166 public ResequencerType timeout(long timeout) {
167 if (batchConfig != null) {
168 batchConfig.setBatchTimeout(timeout);
169 } else {
170 streamConfig.setTimeout(timeout);
171 }
172 return this;
173 }
174
175 public ResequencerType size(int batchSize) {
176 if (batchConfig == null) {
177 throw new IllegalStateException("size() only supported for batch resequencer");
178 }
179 batchConfig.setBatchSize(batchSize);
180 return this;
181 }
182
183 public ResequencerType capacity(int capacity) {
184 if (streamConfig == null) {
185 throw new IllegalStateException("capacity() only supported for stream resequencer");
186 }
187 streamConfig.setCapacity(capacity);
188 return this;
189
190 }
191
192 public ResequencerType comparator(ExpressionResultComparator<Exchange> comparator) {
193 if (streamConfig == null) {
194 throw new IllegalStateException("comparator() only supported for stream resequencer");
195 }
196 streamConfig.setComparator(comparator);
197 return this;
198
199 }
200
201 @Override
202 public Processor createProcessor(RouteContext routeContext) throws Exception {
203 if (batchConfig != null) {
204 return createBatchResequencer(routeContext, batchConfig);
205 } else {
206 // streamConfig should be non-null if batchConfig is null
207 return createStreamResequencer(routeContext, streamConfig);
208 }
209 }
210
211 /**
212 * Creates a batch {@link Resequencer} instance applying the given
213 * <code>config</code>.
214 *
215 * @param routeContext
216 * route context.
217 * @param config
218 * batch resequencer configuration.
219 * @return the configured batch resequencer.
220 * @throws Exception
221 */
222 protected Resequencer createBatchResequencer(RouteContext routeContext,
223 BatchResequencerConfig config) throws Exception {
224 Processor processor = routeContext.createProcessor(this);
225 Resequencer resequencer = new Resequencer(processor, resolveExpressionList(routeContext));
226 resequencer.setBatchSize(config.getBatchSize());
227 resequencer.setBatchTimeout(config.getBatchTimeout());
228 return resequencer;
229 }
230
231 /**
232 * Creates a {@link StreamResequencer} instance applying the given
233 * <code>config</code>.
234 *
235 * @param routeContext
236 * route context.
237 * @param config
238 * stream resequencer configuration.
239 * @return the configured stream resequencer.
240 * @throws Exception
241 */
242 protected StreamResequencer createStreamResequencer(RouteContext routeContext,
243 StreamResequencerConfig config) throws Exception {
244 config.getComparator().setExpressions(resolveExpressionList(routeContext));
245 Processor processor = routeContext.createProcessor(this);
246 StreamResequencer resequencer = new StreamResequencer(processor, config.getComparator());
247 resequencer.setTimeout(config.getTimeout());
248 resequencer.setCapacity(config.getCapacity());
249 return resequencer;
250
251 }
252
253 private List<Expression> resolveExpressionList(RouteContext routeContext) {
254 if (expressionList == null) {
255 expressionList = new ArrayList<Expression>();
256 for (ExpressionType expression : expressions) {
257 expressionList.add(expression.createExpression(routeContext));
258 }
259 }
260 if (expressionList.isEmpty()) {
261 throw new IllegalArgumentException("No expressions configured for: " + this);
262 }
263 return expressionList;
264 }
265 }