001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.concurrent.ArrayBlockingQueue;
020 import java.util.concurrent.BlockingQueue;
021 import java.util.concurrent.RejectedExecutionException;
022 import java.util.concurrent.ThreadFactory;
023 import java.util.concurrent.ThreadPoolExecutor;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicBoolean;
026
027 import org.apache.camel.AsyncCallback;
028 import org.apache.camel.AsyncProcessor;
029 import org.apache.camel.Exchange;
030 import org.apache.camel.Service;
031 import org.apache.camel.util.AsyncProcessorHelper;
032
033 /**
034 * A processor that forces async processing of the exchange using a thread pool.
035 *
036 * @version $Revision: 659491 $
037 */
038 public class ThreadProcessor implements AsyncProcessor, Service {
039
040 private ThreadPoolExecutor executor;
041 private long stackSize;
042 private ThreadGroup threadGroup;
043 private int priority = Thread.NORM_PRIORITY;
044 private boolean daemon = true;
045 private String name = "Thread Processor";
046 private BlockingQueue<Runnable> taskQueue;
047 private long keepAliveTime;
048 private int maxSize = 1;
049 private int coreSize = 1;
050 private final AtomicBoolean shutdown = new AtomicBoolean(true);
051 private boolean callerRunsWhenRejected = true;
052
053 class ProcessCall implements Runnable {
054 private final Exchange exchange;
055 private final AsyncCallback callback;
056
057 public ProcessCall(Exchange exchange, AsyncCallback callback) {
058 this.exchange = exchange;
059 this.callback = callback;
060 }
061
062 public void run() {
063 if (shutdown.get()) {
064 exchange.setException(new RejectedExecutionException());
065 }
066 callback.done(false);
067 }
068 }
069
070 public void process(Exchange exchange) throws Exception {
071 AsyncProcessorHelper.process(this, exchange);
072 }
073
074 public boolean process(final Exchange exchange, final AsyncCallback callback) {
075 if (shutdown.get()) {
076 throw new IllegalStateException("ThreadProcessor is not running.");
077 }
078 ProcessCall call = new ProcessCall(exchange, callback);
079 try {
080 executor.execute(call);
081 return false;
082 } catch (RejectedExecutionException e) {
083 if (callerRunsWhenRejected) {
084 if (shutdown.get()) {
085 exchange.setException(new RejectedExecutionException());
086 } else {
087 callback.done(true);
088 }
089 } else {
090 exchange.setException(e);
091 }
092 return true;
093 }
094 }
095
096 public void start() throws Exception {
097 shutdown.set(false);
098 getExecutor();
099 }
100
101 public void stop() throws Exception {
102 shutdown.set(true);
103 executor.shutdown();
104 executor.awaitTermination(0, TimeUnit.SECONDS);
105 }
106
107 public long getStackSize() {
108 return stackSize;
109 }
110
111 public void setStackSize(long stackSize) {
112 this.stackSize = stackSize;
113 }
114
115 public ThreadGroup getThreadGroup() {
116 return threadGroup;
117 }
118
119 public void setThreadGroup(ThreadGroup threadGroup) {
120 this.threadGroup = threadGroup;
121 }
122
123 public int getPriority() {
124 return priority;
125 }
126
127 public void setPriority(int priority) {
128 this.priority = priority;
129 }
130
131 public boolean isDaemon() {
132 return daemon;
133 }
134
135 public void setDaemon(boolean daemon) {
136 this.daemon = daemon;
137 }
138
139 public String getName() {
140 return name;
141 }
142
143 public void setName(String name) {
144 this.name = name;
145 }
146
147 public long getKeepAliveTime() {
148 return keepAliveTime;
149 }
150
151 public void setKeepAliveTime(long keepAliveTime) {
152 this.keepAliveTime = keepAliveTime;
153 }
154
155 public int getMaxSize() {
156 return maxSize;
157 }
158
159 public void setMaxSize(int maxSize) {
160 this.maxSize = maxSize;
161 }
162
163 public int getCoreSize() {
164 return coreSize;
165 }
166
167 public void setCoreSize(int coreSize) {
168 this.coreSize = coreSize;
169 }
170
171 public BlockingQueue<Runnable> getTaskQueue() {
172 if (taskQueue == null) {
173 taskQueue = new ArrayBlockingQueue<Runnable>(1000);
174 }
175 return taskQueue;
176 }
177
178 public void setTaskQueue(BlockingQueue<Runnable> taskQueue) {
179 this.taskQueue = taskQueue;
180 }
181
182 public ThreadPoolExecutor getExecutor() {
183 if (executor == null) {
184 executor = new ThreadPoolExecutor(getCoreSize(), getMaxSize(), getKeepAliveTime(), TimeUnit.MILLISECONDS, getTaskQueue(), new ThreadFactory() {
185 public Thread newThread(Runnable runnable) {
186 Thread thread;
187 if (getStackSize() > 0) {
188 thread = new Thread(getThreadGroup(), runnable, getName(), getStackSize());
189 } else {
190 thread = new Thread(getThreadGroup(), runnable, getName());
191 }
192 thread.setDaemon(isDaemon());
193 thread.setPriority(getPriority());
194 return thread;
195 }
196 });
197 }
198 return executor;
199 }
200
201 public void setExecutor(ThreadPoolExecutor executor) {
202 this.executor = executor;
203 }
204
205 public boolean isCallerRunsWhenRejected() {
206 return callerRunsWhenRejected;
207 }
208
209 public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
210 this.callerRunsWhenRejected = callerRunsWhenRejected;
211 }
212
213 }