001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.model;
018
019 import java.util.concurrent.ExecutorService;
020 import javax.xml.bind.annotation.XmlAccessType;
021 import javax.xml.bind.annotation.XmlAccessorType;
022 import javax.xml.bind.annotation.XmlAttribute;
023 import javax.xml.bind.annotation.XmlRootElement;
024 import javax.xml.bind.annotation.XmlTransient;
025
026 import org.apache.camel.Processor;
027 import org.apache.camel.WaitForTaskToComplete;
028 import org.apache.camel.processor.ThreadsProcessor;
029 import org.apache.camel.processor.UnitOfWorkProcessor;
030 import org.apache.camel.spi.RouteContext;
031 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
032
033 /**
034 * Represents an XML <threads/> element
035 *
036 * @version $Revision: 779564 $
037 */
038 @XmlRootElement(name = "threads")
039 @XmlAccessorType(XmlAccessType.FIELD)
040 public class ThreadsDefinition extends OutputDefinition<ProcessorDefinition> {
041
042 @XmlTransient
043 private ExecutorService executorService;
044 @XmlAttribute(required = false)
045 private String executorServiceRef;
046 @XmlAttribute(required = false)
047 private Integer poolSize;
048 @XmlAttribute(required = false)
049 private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
050
051 @Override
052 public Processor createProcessor(RouteContext routeContext) throws Exception {
053 if (executorServiceRef != null) {
054 executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
055 }
056 if (executorService == null && poolSize != null) {
057 executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "AsyncProcessor", true);
058 }
059 Processor childProcessor = routeContext.createProcessor(this);
060
061 // wrap it in a unit of work so the route that comes next is also done in a unit of work
062 UnitOfWorkProcessor uow = new UnitOfWorkProcessor(childProcessor);
063
064 return new ThreadsProcessor(uow, executorService, waitForTaskToComplete);
065 }
066
067 @Override
068 public String getLabel() {
069 return "threads";
070 }
071
072 @Override
073 public String getShortName() {
074 return "threads";
075 }
076
077 @Override
078 public String toString() {
079 return "Threads[" + getOutputs() + "]";
080 }
081
082 /**
083 * Setting the executor service for executing the multicasting action.
084 *
085 * @return the builder
086 */
087 public ThreadsDefinition executorService(ExecutorService executorService) {
088 setExecutorService(executorService);
089 return this;
090 }
091
092 /**
093 * Setting the core pool size for the underlying {@link java.util.concurrent.ExecutorService}.
094 *
095 * @return the builder
096 */
097 public ThreadsDefinition poolSize(int poolSize) {
098 setPoolSize(poolSize);
099 return this;
100 }
101
102 /**
103 * Setting to whether to wait for async tasks to be complete before continuing original route.
104 * <p/>
105 * Is default <tt>IfReplyExpected</tt>
106 *
107 * @param wait the wait option
108 * @return the builder
109 */
110 public ThreadsDefinition waitForTaskToComplete(WaitForTaskToComplete wait) {
111 setWaitForTaskToComplete(wait);
112 return this;
113 }
114
115 public ExecutorService getExecutorService() {
116 return executorService;
117 }
118
119 public void setExecutorService(ExecutorService executorService) {
120 this.executorService = executorService;
121 }
122
123 public Integer getPoolSize() {
124 return poolSize;
125 }
126
127 public void setPoolSize(Integer poolSize) {
128 this.poolSize = poolSize;
129 }
130
131 public WaitForTaskToComplete getWaitForTaskToComplete() {
132 return waitForTaskToComplete;
133 }
134
135 public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) {
136 this.waitForTaskToComplete = waitForTaskToComplete;
137 }
138 }