001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.ArrayList;
020 import java.util.List;
021 import java.util.concurrent.BlockingQueue;
022 import java.util.concurrent.ThreadPoolExecutor;
023
024 import javax.xml.bind.annotation.XmlAccessType;
025 import javax.xml.bind.annotation.XmlAccessorType;
026 import javax.xml.bind.annotation.XmlAttribute;
027 import javax.xml.bind.annotation.XmlElementRef;
028 import javax.xml.bind.annotation.XmlRootElement;
029 import javax.xml.bind.annotation.XmlTransient;
030
031 import org.apache.camel.Processor;
032 import org.apache.camel.processor.Pipeline;
033 import org.apache.camel.processor.ThreadProcessor;
034 import org.apache.camel.spi.RouteContext;
035
036 /**
037 * Represents an XML <thread/> element
038 *
039 * @version $Revision: 671918 $
040 */
041 @XmlRootElement(name = "thread")
042 @XmlAccessorType(XmlAccessType.FIELD)
043 public class ThreadType extends ProcessorType<ProcessorType> {
044 @XmlAttribute(required = false)
045 private Integer coreSize = 1;
046 @XmlAttribute(required = false)
047 private Boolean daemon = Boolean.TRUE;
048 @XmlAttribute(required = false)
049 private Long keepAliveTime;
050 @XmlAttribute(required = false)
051 private Integer maxSize = 1;
052 @XmlAttribute(required = false)
053 private String name = "Thread Processor";
054 @XmlAttribute(required = false)
055 private Integer priority = Thread.NORM_PRIORITY;
056 @XmlAttribute(required = false)
057 private Long stackSize;
058 @XmlElementRef
059 private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
060 @XmlTransient
061 private BlockingQueue<Runnable> taskQueue;
062 @XmlTransient
063 private ThreadGroup threadGroup;
064 @XmlTransient
065 private ThreadPoolExecutor executor;
066
067 public ThreadType() {
068 }
069
070 public ThreadType(int coreSize) {
071 this.coreSize = coreSize;
072 this.maxSize = coreSize;
073 }
074
075 public ThreadType(ThreadPoolExecutor executor) {
076 this.executor = executor;
077 }
078
079 @Override
080 public List<ProcessorType<?>> getOutputs() {
081 return outputs;
082 }
083
084 @Override
085 public String toString() {
086 return "Thread[" + getLabel() + "]";
087 }
088
089 @Override
090 public String getShortName() {
091 return "thread";
092 }
093
094 @Override
095 public String getLabel() {
096 return "coreSize=" + coreSize;
097 }
098
099 @Override
100 public Processor createProcessor(RouteContext routeContext) throws Exception {
101
102 ThreadProcessor thread = new ThreadProcessor();
103 thread.setExecutor(executor);
104 if (coreSize != null) {
105 thread.setCoreSize(coreSize);
106 }
107 if (daemon != null) {
108 thread.setDaemon(daemon);
109 }
110 if (keepAliveTime != null) {
111 thread.setKeepAliveTime(keepAliveTime);
112 }
113 if (maxSize != null) {
114 thread.setMaxSize(maxSize);
115 }
116 thread.setName(name);
117 thread.setPriority(priority);
118 if (stackSize != null) {
119 thread.setStackSize(stackSize);
120 }
121 thread.setTaskQueue(taskQueue);
122 thread.setThreadGroup(threadGroup);
123
124 // TODO: see if we can avoid creating so many nested pipelines
125
126 ArrayList<Processor> pipe = new ArrayList<Processor>(2);
127 pipe.add(thread);
128 pipe.add(createOutputsProcessor(routeContext, outputs));
129 return new Pipeline(pipe);
130 }
131
132 ///////////////////////////////////////////////////////////////////
133 //
134 // Fluent Methods
135 //
136 ///////////////////////////////////////////////////////////////////
137 public ThreadType coreSize(int coreSize) {
138 setCoreSize(coreSize);
139 return this;
140 }
141
142 public ThreadType daemon(boolean daemon) {
143 setDaemon(daemon);
144 return this;
145 }
146
147 public ThreadType keepAliveTime(long keepAliveTime) {
148 setKeepAliveTime(keepAliveTime);
149 return this;
150 }
151
152 public ThreadType maxSize(int maxSize) {
153 setMaxSize(maxSize);
154 return this;
155 }
156
157 public ThreadType name(String name) {
158 setName(name);
159 return this;
160 }
161
162 public ThreadType priority(int priority) {
163 setPriority(priority);
164 return this;
165 }
166
167 public ThreadType stackSize(long stackSize) {
168 setStackSize(stackSize);
169 return this;
170 }
171
172 public ThreadType taskQueue(BlockingQueue<Runnable> taskQueue) {
173 setTaskQueue(taskQueue);
174 return this;
175 }
176
177 public ThreadType threadGroup(ThreadGroup threadGroup) {
178 setThreadGroup(threadGroup);
179 return this;
180 }
181
182 public ThreadType executor(ThreadPoolExecutor executor) {
183 setExecutor(executor);
184 return this;
185 }
186
187 ///////////////////////////////////////////////////////////////////
188 //
189 // Property Accessors
190 //
191 ///////////////////////////////////////////////////////////////////
192
193 public void setCoreSize(int coreSize) {
194 this.coreSize = coreSize;
195 }
196
197 public void setDaemon(boolean daemon) {
198 this.daemon = daemon;
199 }
200
201 public void setKeepAliveTime(long keepAliveTime) {
202 this.keepAliveTime = keepAliveTime;
203 }
204
205 public void setMaxSize(int maxSize) {
206 this.maxSize = maxSize;
207 }
208
209 public void setName(String name) {
210 this.name = name;
211 }
212
213 public void setPriority(int priority) {
214 this.priority = priority;
215 }
216
217 public void setStackSize(long stackSize) {
218 this.stackSize = stackSize;
219 }
220
221 public void setTaskQueue(BlockingQueue<Runnable> taskQueue) {
222 this.taskQueue = taskQueue;
223 }
224
225 public void setThreadGroup(ThreadGroup threadGroup) {
226 this.threadGroup = threadGroup;
227 }
228
229 public ThreadPoolExecutor getExecutor() {
230 return executor;
231 }
232
233 public void setExecutor(ThreadPoolExecutor executor) {
234 this.executor = executor;
235 }
236 }