001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.file;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.lang.reflect.Method;
022 import java.util.Comparator;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import org.apache.camel.CamelContext;
027 import org.apache.camel.Component;
028 import org.apache.camel.Exchange;
029 import org.apache.camel.Expression;
030 import org.apache.camel.Message;
031 import org.apache.camel.Processor;
032 import org.apache.camel.impl.ScheduledPollEndpoint;
033 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
034 import org.apache.camel.spi.FactoryFinder;
035 import org.apache.camel.spi.IdempotentRepository;
036 import org.apache.camel.spi.Language;
037 import org.apache.camel.util.FileUtil;
038 import org.apache.camel.util.ObjectHelper;
039 import org.apache.camel.util.UuidGenerator;
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042
043 /**
044 * Generic FileEndpoint
045 */
046 public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint {
047
048 protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory";
049 protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
050
051 protected final transient Log log = LogFactory.getLog(getClass());
052
053 protected GenericFileProcessStrategy<T> processStrategy;
054 protected GenericFileConfiguration configuration;
055
056 protected IdempotentRepository<String> inProgressRepository = new MemoryIdempotentRepository();
057 protected String localWorkDirectory;
058 protected boolean autoCreate = true;
059 protected int bufferSize = 128 * 1024;
060 protected GenericFileExist fileExist = GenericFileExist.Override;
061 protected boolean noop;
062 protected boolean recursive;
063 protected boolean delete;
064 protected boolean flatten;
065 protected int maxMessagesPerPoll;
066 protected String tempPrefix;
067 protected String include;
068 protected String exclude;
069 protected Expression fileName;
070 protected Expression move;
071 protected Expression preMove;
072 protected boolean idempotent;
073 protected IdempotentRepository idempotentRepository;
074 protected GenericFileFilter<T> filter;
075 protected Comparator<GenericFile<T>> sorter;
076 protected Comparator<Exchange> sortBy;
077 protected String readLock = "none";
078 protected long readLockTimeout;
079 protected GenericFileExclusiveReadLockStrategy exclusiveReadLockStrategy;
080
081 public GenericFileEndpoint() {
082 }
083
084 public GenericFileEndpoint(String endpointUri, Component component) {
085 super(endpointUri, component);
086 }
087
088 public boolean isSingleton() {
089 return true;
090 }
091
092 public abstract GenericFileConsumer<T> createConsumer(Processor processor) throws Exception;
093
094 public abstract GenericFileProducer<T> createProducer() throws Exception;
095
096 public abstract Exchange createExchange(GenericFile<T> file);
097
098 public abstract String getScheme();
099
100 public abstract char getFileSeparator();
101
102 public abstract boolean isAbsolute(String name);
103
104 /**
105 * Return the file name that will be auto-generated for the given message if
106 * none is provided
107 */
108 public String getGeneratedFileName(Message message) {
109 return UuidGenerator.generateSanitizedId(message.getMessageId());
110 }
111
112 public GenericFileProcessStrategy<T> getGenericFileProcessStrategy() {
113 if (processStrategy == null) {
114 processStrategy = createGenericFileStrategy();
115 if (log.isDebugEnabled()) {
116 log.debug("Using Generic file process strategy: " + processStrategy);
117 }
118 }
119 return processStrategy;
120 }
121
122 /**
123 * A strategy method to lazily create the file strategy
124 */
125 @SuppressWarnings("unchecked")
126 protected GenericFileProcessStrategy<T> createGenericFileStrategy() {
127 Class<?> factory = null;
128 try {
129 FactoryFinder finder = getCamelContext().getFactoryFinder("META-INF/services/org/apache/camel/component/");
130 factory = finder.findClass(getScheme(), "strategy.factory.");
131 } catch (ClassNotFoundException e) {
132 log.debug("'strategy.factory.class' not found", e);
133 } catch (IOException e) {
134 log.debug("No strategy factory defined in 'META-INF/services/org/apache/camel/component/'", e);
135 }
136
137 if (factory == null) {
138 // use default
139 factory = this.getCamelContext().getClassResolver().resolveClass(DEFAULT_STRATEGYFACTORY_CLASS);
140 if (factory == null) {
141 throw new TypeNotPresentException(DEFAULT_STRATEGYFACTORY_CLASS + " class not found", null);
142 }
143 }
144
145 try {
146 Method factoryMethod = factory.getMethod("createGenericFileProcessStrategy", CamelContext.class, Map.class);
147 return (GenericFileProcessStrategy<T>) ObjectHelper.invokeMethod(factoryMethod, null, getCamelContext(), getParamsAsMap());
148 } catch (NoSuchMethodException e) {
149 throw new TypeNotPresentException(factory.getSimpleName() + ".createGenericFileProcessStrategy method not found", e);
150 }
151 }
152
153 public boolean isNoop() {
154 return noop;
155 }
156
157 public void setNoop(boolean noop) {
158 this.noop = noop;
159 }
160
161 public boolean isRecursive() {
162 return recursive;
163 }
164
165 public void setRecursive(boolean recursive) {
166 this.recursive = recursive;
167 }
168
169 public String getInclude() {
170 return include;
171 }
172
173 public void setInclude(String include) {
174 this.include = include;
175 }
176
177 public String getExclude() {
178 return exclude;
179 }
180
181 public void setExclude(String exclude) {
182 this.exclude = exclude;
183 }
184
185 public boolean isDelete() {
186 return delete;
187 }
188
189 public void setDelete(boolean delete) {
190 this.delete = delete;
191 }
192
193 public boolean isFlatten() {
194 return flatten;
195 }
196
197 public void setFlatten(boolean flatten) {
198 this.flatten = flatten;
199 }
200
201 public Expression getMove() {
202 return move;
203 }
204
205 public void setMove(Expression move) {
206 this.move = move;
207 }
208
209 /**
210 * Sets the move expression based on
211 * {@link org.apache.camel.language.simple.FileLanguage}
212 */
213 public void setMove(String fileLanguageExpression) {
214 String expression = configureMoveOrPreMoveExpression(fileLanguageExpression);
215 this.move = createFileLangugeExpression(expression);
216 }
217
218 public Expression getPreMove() {
219 return preMove;
220 }
221
222 public void setPreMove(Expression preMove) {
223 this.preMove = preMove;
224 }
225
226 /**
227 * Sets the pre move expression based on
228 * {@link org.apache.camel.language.simple.FileLanguage}
229 */
230 public void setPreMove(String fileLanguageExpression) {
231 String expression = configureMoveOrPreMoveExpression(fileLanguageExpression);
232 this.preMove = createFileLangugeExpression(expression);
233 }
234
235 public Expression getFileName() {
236 return fileName;
237 }
238
239 public void setFileName(Expression fileName) {
240 this.fileName = fileName;
241 }
242
243 /**
244 * Sets the file expression based on
245 * {@link org.apache.camel.language.simple.FileLanguage}
246 */
247 public void setFileName(String fileLanguageExpression) {
248 this.fileName = createFileLangugeExpression(fileLanguageExpression);
249 }
250
251 public boolean isIdempotent() {
252 return idempotent;
253 }
254
255 public void setIdempotent(boolean idempotent) {
256 this.idempotent = idempotent;
257 }
258
259 public IdempotentRepository getIdempotentRepository() {
260 return idempotentRepository;
261 }
262
263 public void setIdempotentRepository(IdempotentRepository idempotentRepository) {
264 this.idempotentRepository = idempotentRepository;
265 }
266
267 public GenericFileFilter<T> getFilter() {
268 return filter;
269 }
270
271 public void setFilter(GenericFileFilter<T> filter) {
272 this.filter = filter;
273 }
274
275 public Comparator<GenericFile<T>> getSorter() {
276 return sorter;
277 }
278
279 public void setSorter(Comparator<GenericFile<T>> sorter) {
280 this.sorter = sorter;
281 }
282
283 public Comparator<Exchange> getSortBy() {
284 return sortBy;
285 }
286
287 public void setSortBy(Comparator<Exchange> sortBy) {
288 this.sortBy = sortBy;
289 }
290
291 public void setSortBy(String expression) {
292 setSortBy(expression, false);
293 }
294
295 public void setSortBy(String expression, boolean reverse) {
296 setSortBy(GenericFileDefaultSorter.sortByFileLanguage(getCamelContext(), expression, reverse));
297 }
298
299 public String getTempPrefix() {
300 return tempPrefix;
301 }
302
303 /**
304 * Enables and uses temporary prefix when writing files, after write it will
305 * be renamed to the correct name.
306 */
307 public void setTempPrefix(String tempPrefix) {
308 this.tempPrefix = tempPrefix;
309 }
310
311 public GenericFileConfiguration getConfiguration() {
312 if (configuration == null) {
313 configuration = new GenericFileConfiguration();
314 }
315 return configuration;
316 }
317
318 public void setConfiguration(GenericFileConfiguration configuration) {
319 this.configuration = configuration;
320 }
321
322 public GenericFileExclusiveReadLockStrategy getExclusiveReadLockStrategy() {
323 return exclusiveReadLockStrategy;
324 }
325
326 public void setExclusiveReadLockStrategy(GenericFileExclusiveReadLockStrategy exclusiveReadLockStrategy) {
327 this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
328 }
329
330 public String getReadLock() {
331 return readLock;
332 }
333
334 public void setReadLock(String readLock) {
335 this.readLock = readLock;
336 }
337
338 public long getReadLockTimeout() {
339 return readLockTimeout;
340 }
341
342 public void setReadLockTimeout(long readLockTimeout) {
343 this.readLockTimeout = readLockTimeout;
344 }
345
346 public int getBufferSize() {
347 return bufferSize;
348 }
349
350 public void setBufferSize(int bufferSize) {
351 this.bufferSize = bufferSize;
352 }
353
354 public GenericFileExist getFileExist() {
355 return fileExist;
356 }
357
358 public void setFileExist(GenericFileExist fileExist) {
359 this.fileExist = fileExist;
360 }
361
362 public boolean isAutoCreate() {
363 return autoCreate;
364 }
365
366 public void setAutoCreate(boolean autoCreate) {
367 this.autoCreate = autoCreate;
368 }
369
370 public GenericFileProcessStrategy<T> getProcessStrategy() {
371 return processStrategy;
372 }
373
374 public void setProcessStrategy(GenericFileProcessStrategy<T> processStrategy) {
375 this.processStrategy = processStrategy;
376 }
377
378 public String getLocalWorkDirectory() {
379 return localWorkDirectory;
380 }
381
382 public void setLocalWorkDirectory(String localWorkDirectory) {
383 this.localWorkDirectory = localWorkDirectory;
384 }
385
386 public int getMaxMessagesPerPoll() {
387 return maxMessagesPerPoll;
388 }
389
390 public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
391 this.maxMessagesPerPoll = maxMessagesPerPoll;
392 }
393
394 public IdempotentRepository<String> getInProgressRepository() {
395 return inProgressRepository;
396 }
397
398 public void setInProgressRepository(IdempotentRepository<String> inProgressRepository) {
399 this.inProgressRepository = inProgressRepository;
400 }
401
402 /**
403 * Configures the given message with the file which sets the body to the
404 * file object.
405 */
406 public void configureMessage(GenericFile<T> file, Message message) {
407 message.setBody(file);
408
409 if (flatten) {
410 // when flatten the file name should not contain any paths
411 message.setHeader(Exchange.FILE_NAME, file.getFileNameOnly());
412 } else {
413 // compute name to set on header that should be relative to starting directory
414 String name = file.isAbsolute() ? file.getAbsoluteFilePath() : file.getRelativeFilePath();
415
416 // skip leading endpoint configured directory
417 String endpointPath = getConfiguration().getDirectory();
418 if (ObjectHelper.isNotEmpty(endpointPath) && name.startsWith(endpointPath)) {
419 name = ObjectHelper.after(name, getConfiguration().getDirectory() + File.separator);
420 }
421
422 // adjust filename
423 message.setHeader(Exchange.FILE_NAME, name);
424 }
425 }
426
427 /**
428 * Strategy to configure the move or premove option based on a String input.
429 * <p/>
430 * @param expression the original string input
431 * @return configured string or the original if no modifications is needed
432 */
433 protected String configureMoveOrPreMoveExpression(String expression) {
434 // if the expression already have ${ } placeholders then pass it unmodified
435 if (expression.indexOf("${") != -1) {
436 return expression;
437 }
438
439 // remove trailing slash
440 expression = FileUtil.stripTrailingSeparator(expression);
441
442 StringBuilder sb = new StringBuilder();
443
444 // if relative then insert start with the parent folder
445 if (!isAbsolute(expression)) {
446 sb.append("${file:parent}");
447 sb.append(getFileSeparator());
448 }
449 // insert the directory the end user provided
450 sb.append(expression);
451 // append only the filename (file:name can contain a relative path, so we must use onlyname)
452 sb.append(getFileSeparator());
453 sb.append("${file:onlyname}");
454
455 return sb.toString();
456 }
457
458 protected Map<String, Object> getParamsAsMap() {
459 Map<String, Object> params = new HashMap<String, Object>();
460
461 if (isNoop()) {
462 params.put("noop", Boolean.toString(true));
463 }
464 if (isDelete()) {
465 params.put("delete", Boolean.toString(true));
466 }
467 if (move != null) {
468 params.put("move", move);
469 }
470 if (preMove != null) {
471 params.put("preMove", preMove);
472 }
473 if (exclusiveReadLockStrategy != null) {
474 params.put("exclusiveReadLockStrategy", exclusiveReadLockStrategy);
475 }
476 if (readLock != null) {
477 params.put("readLock", readLock);
478 }
479 if (readLockTimeout > 0) {
480 params.put("readLockTimeout", readLockTimeout);
481 }
482
483 return params;
484 }
485
486 private Expression createFileLangugeExpression(String expression) {
487 Language language = getCamelContext().resolveLanguage("file");
488 return language.createExpression(expression);
489 }
490 }