001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.ArrayList;
020 import java.util.List;
021 import java.util.concurrent.BlockingQueue;
022 import java.util.concurrent.ThreadPoolExecutor;
023
024 import javax.xml.bind.annotation.XmlAccessType;
025 import javax.xml.bind.annotation.XmlAccessorType;
026 import javax.xml.bind.annotation.XmlAttribute;
027 import javax.xml.bind.annotation.XmlElementRef;
028 import javax.xml.bind.annotation.XmlRootElement;
029 import javax.xml.bind.annotation.XmlTransient;
030
031 import org.apache.camel.Processor;
032 import org.apache.camel.builder.ErrorHandlerBuilder;
033 import org.apache.camel.processor.Pipeline;
034 import org.apache.camel.processor.ThreadProcessor;
035 import org.apache.camel.spi.RouteContext;
036
037 /**
038 * Represents an XML <thread/> element
039 *
040 * @version $Revision: 705880 $
041 */
042 @XmlRootElement(name = "thread")
043 @XmlAccessorType(XmlAccessType.FIELD)
044 public class ThreadType extends ProcessorType<ProcessorType> {
045 @XmlAttribute(required = false)
046 private Integer coreSize = 1;
047 @XmlAttribute(required = false)
048 private Boolean daemon = Boolean.TRUE;
049 @XmlAttribute(required = false)
050 private Long keepAliveTime;
051 @XmlAttribute(required = false)
052 private Integer maxSize = 1;
053 @XmlAttribute(required = false)
054 private String name = "Thread Processor";
055 @XmlAttribute(required = false)
056 private Integer priority = Thread.NORM_PRIORITY;
057 @XmlAttribute(required = false)
058 private Long stackSize;
059 @XmlElementRef
060 private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
061 @XmlTransient
062 private BlockingQueue<Runnable> taskQueue;
063 @XmlTransient
064 private ThreadGroup threadGroup;
065 @XmlTransient
066 private ThreadPoolExecutor executor;
067
068 public ThreadType() {
069 }
070
071 public ThreadType(int coreSize) {
072 this.coreSize = coreSize;
073 this.maxSize = coreSize;
074 }
075
076 public ThreadType(ThreadPoolExecutor executor) {
077 this.executor = executor;
078 }
079
080 @Override
081 public List<ProcessorType<?>> getOutputs() {
082 return outputs;
083 }
084
085 @Override
086 public String toString() {
087 return "Thread[" + name + "]";
088 }
089
090 @Override
091 public String getShortName() {
092 return "thread";
093 }
094
095 @Override
096 public Processor createProcessor(RouteContext routeContext) throws Exception {
097 ThreadProcessor thread = new ThreadProcessor();
098 thread.setExecutor(executor);
099 if (coreSize != null) {
100 thread.setCoreSize(coreSize);
101 }
102 if (daemon != null) {
103 thread.setDaemon(daemon);
104 }
105 if (keepAliveTime != null) {
106 thread.setKeepAliveTime(keepAliveTime);
107 }
108 if (maxSize != null) {
109 thread.setMaxSize(maxSize);
110 }
111 thread.setName(name);
112 thread.setPriority(priority);
113 if (stackSize != null) {
114 thread.setStackSize(stackSize);
115 }
116 thread.setTaskQueue(taskQueue);
117 thread.setThreadGroup(threadGroup);
118
119 // TODO: see if we can avoid creating so many nested pipelines
120 ArrayList<Processor> pipe = new ArrayList<Processor>(2);
121 pipe.add(thread);
122 pipe.add(createOutputsProcessor(routeContext, outputs));
123 return new Pipeline(pipe);
124 }
125
126 @Override
127 protected void configureChild(ProcessorType output) {
128 super.configureChild(output);
129 if (isInheritErrorHandler()) {
130 output.setErrorHandlerBuilder(getErrorHandlerBuilder());
131 }
132 }
133
134 // Fluent methods
135 // -----------------------------------------------------------------------
136 @Override
137 public ProcessorType errorHandler(ErrorHandlerBuilder errorHandlerBuilder) {
138 // do not support setting error handling on thread type as its confusing and will not be used
139 throw new IllegalArgumentException("Setting errorHandler on ThreadType is not supported."
140 + " Instead set the errorHandler on the parent.");
141 }
142
143 public ThreadType coreSize(int coreSize) {
144 setCoreSize(coreSize);
145 return this;
146 }
147
148 public ThreadType daemon(boolean daemon) {
149 setDaemon(daemon);
150 return this;
151 }
152
153 public ThreadType keepAliveTime(long keepAliveTime) {
154 setKeepAliveTime(keepAliveTime);
155 return this;
156 }
157
158 public ThreadType maxSize(int maxSize) {
159 setMaxSize(maxSize);
160 return this;
161 }
162
163 public ThreadType name(String name) {
164 setName(name);
165 return this;
166 }
167
168 public ThreadType priority(int priority) {
169 setPriority(priority);
170 return this;
171 }
172
173 public ThreadType stackSize(long stackSize) {
174 setStackSize(stackSize);
175 return this;
176 }
177
178 public ThreadType taskQueue(BlockingQueue<Runnable> taskQueue) {
179 setTaskQueue(taskQueue);
180 return this;
181 }
182
183 public ThreadType threadGroup(ThreadGroup threadGroup) {
184 setThreadGroup(threadGroup);
185 return this;
186 }
187
188 public ThreadType executor(ThreadPoolExecutor executor) {
189 setExecutor(executor);
190 return this;
191 }
192
193 ///////////////////////////////////////////////////////////////////
194 //
195 // Property Accessors
196 //
197 ///////////////////////////////////////////////////////////////////
198
199 public void setCoreSize(int coreSize) {
200 this.coreSize = coreSize;
201 }
202
203 public void setDaemon(boolean daemon) {
204 this.daemon = daemon;
205 }
206
207 public void setKeepAliveTime(long keepAliveTime) {
208 this.keepAliveTime = keepAliveTime;
209 }
210
211 public void setMaxSize(int maxSize) {
212 this.maxSize = maxSize;
213 }
214
215 public void setName(String name) {
216 this.name = name;
217 }
218
219 public void setPriority(int priority) {
220 this.priority = priority;
221 }
222
223 public void setStackSize(long stackSize) {
224 this.stackSize = stackSize;
225 }
226
227 public void setTaskQueue(BlockingQueue<Runnable> taskQueue) {
228 this.taskQueue = taskQueue;
229 }
230
231 public void setThreadGroup(ThreadGroup threadGroup) {
232 this.threadGroup = threadGroup;
233 }
234
235 public ThreadPoolExecutor getExecutor() {
236 return executor;
237 }
238
239 public void setExecutor(ThreadPoolExecutor executor) {
240 this.executor = executor;
241 }
242 }