001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.util;
018
019 import java.util.HashMap;
020 import java.util.Iterator;
021 import java.util.Map;
022 import java.util.Set;
023 import java.util.SortedSet;
024 import java.util.TreeSet;
025 import java.util.concurrent.ScheduledExecutorService;
026 import java.util.concurrent.TimeUnit;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030
031 /**
032 * Default implementation of the {@link TimeoutMap}.
033 *
034 * @version $Revision: 660275 $
035 */
036 public class DefaultTimeoutMap implements TimeoutMap, Runnable {
037
038 private static final transient Log LOG = LogFactory.getLog(DefaultTimeoutMap.class);
039
040 private final Map map = new HashMap();
041 private SortedSet index = new TreeSet();
042 private ScheduledExecutorService executor;
043 private long purgePollTime;
044
045 public DefaultTimeoutMap() {
046 this(null, 1000L);
047 }
048
049 public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
050 this.executor = executor;
051 this.purgePollTime = requestMapPollTimeMillis;
052 schedulePoll();
053 }
054
055 public Object get(Object key) {
056 TimeoutMapEntry entry = null;
057 synchronized (map) {
058 entry = (TimeoutMapEntry) map.get(key);
059 if (entry == null) {
060 return null;
061 }
062 index.remove(entry);
063 updateExpireTime(entry);
064 index.add(entry);
065 }
066 return entry.getValue();
067 }
068
069 public void put(Object key, Object value, long timeoutMillis) {
070 TimeoutMapEntry entry = new TimeoutMapEntry(key, value, timeoutMillis);
071 synchronized (map) {
072 Object oldValue = map.put(key, entry);
073 if (oldValue != null) {
074 index.remove(oldValue);
075 }
076 updateExpireTime(entry);
077 index.add(entry);
078 }
079 }
080
081 public void remove(Object id) {
082 synchronized (map) {
083 TimeoutMapEntry entry = (TimeoutMapEntry) map.remove(id);
084 if (entry != null) {
085 index.remove(entry);
086 }
087 }
088 }
089
090 public Object[] getKeys() {
091 Object[] keys = null;
092 synchronized (map) {
093 Set keySet = map.keySet();
094 keys = new Object[keySet.size()];
095 keySet.toArray(keys);
096 }
097 return keys;
098 }
099
100 public int size() {
101 synchronized (map) {
102 return map.size();
103 }
104 }
105
106 /**
107 * The timer task which purges old requests and schedules another poll
108 */
109 public void run() {
110 purge();
111 schedulePoll();
112 }
113
114 public void purge() {
115 long now = currentTime();
116 synchronized (map) {
117 for (Iterator iter = index.iterator(); iter.hasNext();) {
118 TimeoutMapEntry entry = (TimeoutMapEntry) iter.next();
119 if (entry == null) {
120 break;
121 }
122 if (entry.getExpireTime() < now) {
123 if (isValidForEviction(entry)) {
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("Evicting inactive request for correlationID: " + entry);
126 }
127 map.remove(entry.getKey());
128 iter.remove();
129 }
130 } else {
131 break;
132 }
133 }
134 }
135 }
136
137 // Properties
138 // -------------------------------------------------------------------------
139 public long getPurgePollTime() {
140 return purgePollTime;
141 }
142
143 /**
144 * Sets the next purge poll time in milliseconds
145 */
146 public void setPurgePollTime(long purgePollTime) {
147 this.purgePollTime = purgePollTime;
148 }
149
150 public ScheduledExecutorService getExecutor() {
151 return executor;
152 }
153
154 /**
155 * Sets the executor used to schedule purge events of inactive requests
156 */
157 public void setExecutor(ScheduledExecutorService executor) {
158 this.executor = executor;
159 }
160
161 // Implementation methods
162 // -------------------------------------------------------------------------
163
164 /**
165 * lets schedule each time to allow folks to change the time at runtime
166 */
167 protected void schedulePoll() {
168 if (executor != null) {
169 executor.schedule(this, purgePollTime, TimeUnit.MILLISECONDS);
170 }
171 }
172
173 /**
174 * A hook to allow derivations to avoid evicting the current entry
175 */
176 protected boolean isValidForEviction(TimeoutMapEntry entry) {
177 return true;
178 }
179
180 protected void updateExpireTime(TimeoutMapEntry entry) {
181 long now = currentTime();
182 entry.setExpireTime(entry.getTimeout() + now);
183 }
184
185 protected long currentTime() {
186 return System.currentTimeMillis();
187 }
188 }