001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.impl;
018
019 import java.util.ArrayList;
020 import java.util.Collection;
021 import java.util.concurrent.ArrayBlockingQueue;
022 import java.util.concurrent.BlockingQueue;
023 import java.util.concurrent.ConcurrentHashMap;
024
025 import org.apache.camel.spi.ServicePool;
026 import org.apache.camel.util.ServiceHelper;
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029
030 /**
031 * Default implementation to inherit for a basic service pool.
032 *
033 * @version $Revision: 779038 $
034 */
035 public abstract class DefaultServicePool<Key, Service> extends ServiceSupport implements ServicePool<Key, Service> {
036 protected final Log log = LogFactory.getLog(getClass());
037 protected final ConcurrentHashMap<Key, BlockingQueue<Service>> pool = new ConcurrentHashMap<Key, BlockingQueue<Service>>();
038 protected final int capacity;
039
040 /**
041 * The capacity, note this is per key.
042 *
043 * @param capacity the capacity per key.
044 */
045 public DefaultServicePool(int capacity) {
046 this.capacity = capacity;
047 }
048
049 public synchronized Service addAndAcquire(Key key, Service service) {
050 BlockingQueue<Service> entry = pool.get(key);
051 if (entry == null) {
052 entry = new ArrayBlockingQueue<Service>(capacity);
053 pool.put(key, entry);
054 }
055 if (log.isTraceEnabled()) {
056 log.trace("AddAndAcquire key: " + key + " service: " + service);
057 }
058
059 // test if queue will be full
060 if (entry.size() >= capacity) {
061 throw new IllegalStateException("Queue full");
062 }
063 return service;
064 }
065
066 public synchronized Service acquire(Key key) {
067 BlockingQueue<Service> services = pool.get(key);
068 if (services == null || services.isEmpty()) {
069 if (log.isTraceEnabled()) {
070 log.trace("No free services in pool to acquire for key: " + key);
071 }
072 return null;
073 }
074
075 Service answer = services.poll();
076 if (log.isTraceEnabled()) {
077 log.trace("Acquire: " + key + " service: " + answer);
078 }
079 return answer;
080 }
081
082 public synchronized void release(Key key, Service service) {
083 if (log.isTraceEnabled()) {
084 log.trace("Release: " + key + " service: " + service);
085 }
086 BlockingQueue<Service> services = pool.get(key);
087 if (services != null) {
088 services.add(service);
089 }
090 }
091
092 protected void doStart() throws Exception {
093 log.debug("Starting service pool: " + this);
094 }
095
096 protected void doStop() throws Exception {
097 log.debug("Stopping service pool: " + this);
098 for (BlockingQueue<Service> entry : pool.values()) {
099 Collection<Service> values = new ArrayList<Service>();
100 entry.drainTo(values);
101 ServiceHelper.stopServices(values);
102 entry.clear();
103 }
104 pool.clear();
105 }
106
107 }