001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.seda;
018
019 import java.util.ArrayList;
020 import java.util.HashSet;
021 import java.util.List;
022 import java.util.Set;
023 import java.util.concurrent.BlockingQueue;
024 import java.util.concurrent.CopyOnWriteArraySet;
025 import java.util.concurrent.LinkedBlockingQueue;
026
027 import org.apache.camel.Component;
028 import org.apache.camel.Consumer;
029 import org.apache.camel.Exchange;
030 import org.apache.camel.Processor;
031 import org.apache.camel.Producer;
032 import org.apache.camel.WaitForTaskToComplete;
033 import org.apache.camel.impl.DefaultEndpoint;
034 import org.apache.camel.spi.BrowsableEndpoint;
035
036 /**
037 * An implementation of the <a
038 * href="http://camel.apache.org/queue.html">Queue components</a> for
039 * asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
040 *
041 * @version $Revision: 794648 $
042 */
043 public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
044 private BlockingQueue<Exchange> queue;
045 private int size = 1000;
046 private int concurrentConsumers = 1;
047 private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected;
048 private long timeout = 30000;
049 private Set<SedaProducer> producers = new CopyOnWriteArraySet<SedaProducer>();
050 private Set<SedaConsumer> consumers = new CopyOnWriteArraySet<SedaConsumer>();
051
052 public SedaEndpoint() {
053 }
054
055 public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
056 this(endpointUri, component, queue, 1);
057 }
058
059 public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
060 super(endpointUri, component);
061 this.queue = queue;
062 this.concurrentConsumers = concurrentConsumers;
063 }
064
065 public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue) {
066 this(endpointUri, queue, 1);
067 }
068
069 public SedaEndpoint(String endpointUri, BlockingQueue<Exchange> queue, int concurrentConsumers) {
070 super(endpointUri);
071 this.queue = queue;
072 this.concurrentConsumers = concurrentConsumers;
073 }
074
075 public Producer createProducer() throws Exception {
076 return new SedaProducer(this, getQueue(), getWaitForTaskToComplete(), getTimeout());
077 }
078
079 public Consumer createConsumer(Processor processor) throws Exception {
080 return new SedaConsumer(this, processor);
081 }
082
083 public synchronized BlockingQueue<Exchange> getQueue() {
084 if (queue == null) {
085 queue = new LinkedBlockingQueue<Exchange>(size);
086 }
087 return queue;
088 }
089
090 public void setQueue(BlockingQueue<Exchange> queue) {
091 this.queue = queue;
092 }
093
094 public int getSize() {
095 return size;
096 }
097
098 public void setSize(int size) {
099 this.size = size;
100 }
101
102 public void setConcurrentConsumers(int concurrentConsumers) {
103 this.concurrentConsumers = concurrentConsumers;
104 }
105
106 public int getConcurrentConsumers() {
107 return concurrentConsumers;
108 }
109
110 public WaitForTaskToComplete getWaitForTaskToComplete() {
111 return waitForTaskToComplete;
112 }
113
114 public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) {
115 this.waitForTaskToComplete = waitForTaskToComplete;
116 }
117
118 public long getTimeout() {
119 return timeout;
120 }
121
122 public void setTimeout(long timeout) {
123 this.timeout = timeout;
124 }
125
126 public boolean isSingleton() {
127 return true;
128 }
129
130 /**
131 * Returns the current pending exchanges
132 */
133 public List<Exchange> getExchanges() {
134 return new ArrayList<Exchange>(getQueue());
135 }
136
137 /**
138 * Returns the current active consumers on this endpoint
139 */
140 public Set<SedaConsumer> getConsumers() {
141 return new HashSet<SedaConsumer>(consumers);
142 }
143
144 /**
145 * Returns the current active producers on this endpoint
146 */
147 public Set<SedaProducer> getProducers() {
148 return new HashSet<SedaProducer>(producers);
149 }
150
151 void onStarted(SedaProducer producer) {
152 producers.add(producer);
153 }
154
155 void onStopped(SedaProducer producer) {
156 producers.remove(producer);
157 }
158
159 void onStarted(SedaConsumer consumer) {
160 consumers.add(consumer);
161 }
162
163 void onStopped(SedaConsumer consumer) {
164 consumers.remove(consumer);
165 }
166 }