001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor.idempotent;
018
019 import java.io.File;
020 import java.io.FileOutputStream;
021 import java.io.IOException;
022 import java.util.Map;
023 import java.util.Scanner;
024 import java.util.concurrent.atomic.AtomicBoolean;
025
026 import org.apache.camel.spi.IdempotentRepository;
027 import org.apache.camel.util.LRUCache;
028 import org.apache.camel.util.ObjectHelper;
029 import org.apache.commons.logging.Log;
030 import org.apache.commons.logging.LogFactory;
031
032 /**
033 * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
034 * <p/>
035 * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
036 * memory leak.
037 *
038 * @version $Revision: 782534 $
039 */
040 public class FileIdempotentRepository implements IdempotentRepository<String> {
041 private static final transient Log LOG = LogFactory.getLog(FileIdempotentRepository.class);
042 private static final String STORE_DELIMITER = "\n";
043 private Map<String, Object> cache;
044 private File fileStore;
045 private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
046 private AtomicBoolean init = new AtomicBoolean();
047
048 public FileIdempotentRepository() {
049 // default use a 1st level cache
050 this.cache = new LRUCache<String, Object>(1000);
051 }
052
053 public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
054 this.fileStore = fileStore;
055 this.cache = set;
056 }
057
058 /**
059 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
060 * as 1st level cache with a default of 1000 entries in the cache.
061 *
062 * @param fileStore the file store
063 */
064 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
065 return fileIdempotentRepository(fileStore, 1000);
066 }
067
068 /**
069 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
070 * as 1st level cache.
071 *
072 * @param fileStore the file store
073 * @param cacheSize the cache size
074 */
075 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
076 return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
077 }
078
079 /**
080 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
081 * as 1st level cache.
082 *
083 * @param fileStore the file store
084 * @param cacheSize the cache size
085 * @param maxFileStoreSize the max size in bytes for the filestore file
086 */
087 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
088 FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
089 repository.setMaxFileStoreSize(maxFileStoreSize);
090 return repository;
091 }
092
093 /**
094 * Creates a new file based repository using the given {@link java.util.Map}
095 * as 1st level cache.
096 * <p/>
097 * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
098 * memory leak.
099 *
100 * @param store the file store
101 * @param cache the cache to use as 1st level cache
102 */
103 public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
104 return new FileIdempotentRepository(store, cache);
105 }
106
107 public boolean add(String messageId) {
108 synchronized (cache) {
109 // init store if not loaded before
110 if (init.compareAndSet(false, true)) {
111 loadStore();
112 }
113
114 if (cache.containsKey(messageId)) {
115 return false;
116 } else {
117 cache.put(messageId, messageId);
118 if (fileStore.length() < maxFileStoreSize) {
119 // just append to store
120 appendToStore(messageId);
121 } else {
122 // trunk store and flush the cache
123 trunkStore();
124 }
125
126 return true;
127 }
128 }
129 }
130
131 public boolean contains(String key) {
132 synchronized (cache) {
133 // init store if not loaded before
134 if (init.compareAndSet(false, true)) {
135 loadStore();
136 }
137 return cache.containsKey(key);
138 }
139 }
140
141 public boolean remove(String key) {
142 synchronized (cache) {
143 // init store if not loaded before
144 if (init.compareAndSet(false, true)) {
145 loadStore();
146 }
147 return cache.remove(key) != null;
148 }
149 }
150
151 public boolean confirm(String key) {
152 // noop
153 return true;
154 }
155
156 public File getFileStore() {
157 return fileStore;
158 }
159
160 public void setFileStore(File fileStore) {
161 this.fileStore = fileStore;
162 }
163
164 public Map<String, Object> getCache() {
165 return cache;
166 }
167
168 public void setCache(Map<String, Object> cache) {
169 this.cache = cache;
170 }
171
172 public long getMaxFileStoreSize() {
173 return maxFileStoreSize;
174 }
175
176 /**
177 * Sets the maximum filesize for the file store in bytes.
178 * <p/>
179 * The default is 1mb.
180 */
181 public void setMaxFileStoreSize(long maxFileStoreSize) {
182 this.maxFileStoreSize = maxFileStoreSize;
183 }
184
185 /**
186 * Sets the cache size
187 */
188 public void setCacheSize(int size) {
189 if (cache != null) {
190 cache.clear();
191 }
192 cache = new LRUCache<String, Object>(size);
193 }
194
195 /**
196 * Appends the given message id to the file store
197 *
198 * @param messageId the message id
199 */
200 protected void appendToStore(final String messageId) {
201 if (LOG.isDebugEnabled()) {
202 LOG.debug("Appending " + messageId + " to idempotent filestore: " + fileStore);
203 }
204 FileOutputStream fos = null;
205 try {
206 // create store if missing
207 if (!fileStore.exists()) {
208 fileStore.createNewFile();
209 }
210 // append to store
211 fos = new FileOutputStream(fileStore, true);
212 fos.write(messageId.getBytes());
213 fos.write(STORE_DELIMITER.getBytes());
214 } catch (IOException e) {
215 throw ObjectHelper.wrapRuntimeCamelException(e);
216 } finally {
217 ObjectHelper.close(fos, "Appending to file idempotent repository", LOG);
218 }
219 }
220
221 /**
222 * Trunks the file store when the max store size is hit by rewriting the 1st level cache
223 * to the file store.
224 */
225 protected void trunkStore() {
226 if (LOG.isInfoEnabled()) {
227 LOG.info("Trunking idempotent filestore: " + fileStore);
228 }
229 FileOutputStream fos = null;
230 try {
231 fos = new FileOutputStream(fileStore);
232 for (String key : cache.keySet()) {
233 fos.write(key.getBytes());
234 fos.write(STORE_DELIMITER.getBytes());
235 }
236 } catch (IOException e) {
237 throw ObjectHelper.wrapRuntimeCamelException(e);
238 } finally {
239 ObjectHelper.close(fos, "Trunking file idempotent repository", LOG);
240 }
241 }
242
243 /**
244 * Loads the given file store into the 1st level cache
245 */
246 protected void loadStore() {
247 if (LOG.isTraceEnabled()) {
248 LOG.trace("Loading to 1st level cache from idempotent filestore: " + fileStore);
249 }
250
251 if (!fileStore.exists()) {
252 return;
253 }
254
255 cache.clear();
256 Scanner scanner = null;
257 try {
258 scanner = new Scanner(fileStore);
259 scanner.useDelimiter(STORE_DELIMITER);
260 while (scanner.hasNextLine()) {
261 String line = scanner.nextLine();
262 cache.put(line, line);
263 }
264 } catch (IOException e) {
265 throw ObjectHelper.wrapRuntimeCamelException(e);
266 } finally {
267 if (scanner != null) {
268 scanner.close();
269 }
270 }
271
272 if (LOG.isDebugEnabled()) {
273 LOG.debug("Loaded " + cache.size() + " to the 1st level cache from idempotent filestore: " + fileStore);
274 }
275 }
276
277 }