001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.file;
018
019 import java.io.File;
020 import java.util.concurrent.ConcurrentHashMap;
021
022 import org.apache.camel.AsyncCallback;
023 import org.apache.camel.Processor;
024 import org.apache.camel.impl.ScheduledPollConsumer;
025 import org.apache.camel.processor.DeadLetterChannel;
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028
029 /**
030 * For consuming files.
031 *
032 * @version $Revision: 662314 $
033 */
034 public class FileConsumer extends ScheduledPollConsumer<FileExchange> {
035 private static final transient Log LOG = LogFactory.getLog(FileConsumer.class);
036
037 private FileEndpoint endpoint;
038 private ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File, File>();
039 private ConcurrentHashMap<File, Long> fileSizes = new ConcurrentHashMap<File, Long>();
040 private ConcurrentHashMap<File, Long> noopMap = new ConcurrentHashMap<File, Long>();
041
042 private boolean generateEmptyExchangeWhenIdle;
043 private boolean recursive = true;
044 private String regexPattern = "";
045
046 private long lastPollTime;
047 private int unchangedDelay;
048 private boolean unchangedSize;
049
050
051 public FileConsumer(final FileEndpoint endpoint, Processor processor) {
052 super(endpoint, processor);
053 this.endpoint = endpoint;
054 }
055
056 protected synchronized void poll() throws Exception {
057 int rc = pollFileOrDirectory(endpoint.getFile(), isRecursive());
058 if (rc == 0 && generateEmptyExchangeWhenIdle) {
059 final FileExchange exchange = endpoint.createExchange((File)null);
060 getAsyncProcessor().process(exchange, new AsyncCallback() {
061 public void done(boolean sync) {
062 }
063 });
064 }
065 lastPollTime = System.currentTimeMillis();
066 }
067
068 /**
069 * Pools the given file or directory for files to process.
070 *
071 * @param fileOrDirectory file or directory
072 * @param processDir recursive
073 * @return the number of files processed or being processed async.
074 */
075 protected int pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
076 if (!fileOrDirectory.isDirectory()) {
077 return pollFile(fileOrDirectory); // process the file
078 } else if (processDir) {
079 int rc = 0;
080 if (isValidFile(fileOrDirectory)) {
081 LOG.debug("Polling directory " + fileOrDirectory);
082 File[] files = fileOrDirectory.listFiles();
083 for (File file : files) {
084 rc += pollFileOrDirectory(file, isRecursive()); // self-recursion
085 }
086 }
087 return rc;
088 } else {
089 LOG.debug("Skipping directory " + fileOrDirectory);
090 return 0;
091 }
092 }
093
094 /**
095 * Polls the given file
096 *
097 * @param file the file
098 * @return returns 1 if the file was processed, 0 otherwise.
099 */
100 protected int pollFile(final File file) {
101
102 if (!file.exists()) {
103 return 0;
104 }
105 if (!isValidFile(file)) {
106 return 0;
107 }
108 // we only care about file modified times if we are not deleting/moving files
109 if (!endpoint.isNoop()) {
110 if (filesBeingProcessed.contains(file)) {
111 return 1;
112 }
113 filesBeingProcessed.put(file, file);
114 }
115
116 final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
117 final FileExchange exchange = endpoint.createExchange(file);
118
119 endpoint.configureMessage(file, exchange.getIn());
120 try {
121 if (LOG.isDebugEnabled()) {
122 LOG.debug("About to process file: " + file + " using exchange: " + exchange);
123 }
124 if (processStrategy.begin(endpoint, exchange, file)) {
125
126 // Use the async processor interface so that processing of
127 // the exchange can happen asynchronously
128 getAsyncProcessor().process(exchange, new AsyncCallback() {
129 public void done(boolean sync) {
130 boolean failed = exchange.isFailed();
131 boolean handled = DeadLetterChannel.isFailureHandled(exchange);
132
133 if (LOG.isDebugEnabled()) {
134 LOG.debug("Done processing file: " + file + ". Status is: " + (failed ? "failed: " + failed + ", handled by failure processor: " + handled : "OK"));
135 }
136
137 if (!failed || handled) {
138 // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
139 processStrategyCommit(processStrategy, exchange, file, handled);
140 } else if (failed && !handled) {
141 // there was an exception but it was not handled by the DeadLetterChannel
142 handleException(exchange.getException());
143 }
144
145 filesBeingProcessed.remove(file);
146 }
147 });
148
149 } else {
150 if (LOG.isDebugEnabled()) {
151 LOG.debug(endpoint + " cannot process file: " + file);
152 }
153 }
154 } catch (Throwable e) {
155 handleException(e);
156 }
157
158 return 1;
159 }
160
161 /**
162 * Strategy when the file was processed and a commit should be executed.
163 *
164 * @param processStrategy the strategy to perform the commit
165 * @param exchange the exchange
166 * @param file the file processed
167 * @param failureHandled is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
168 * an exception occured during processing but it was handled by the failure processor (usually the
169 * DeadLetterChannel).
170 */
171 protected void processStrategyCommit(FileProcessStrategy processStrategy, FileExchange exchange,
172 File file, boolean failureHandled) {
173 try {
174 if (LOG.isDebugEnabled()) {
175 LOG.debug("Committing file strategy: " + processStrategy + " for file: " + file + (failureHandled ? " that was handled by the failure processor." : ""));
176 }
177 processStrategy.commit(endpoint, exchange, file);
178 } catch (Exception e) {
179 LOG.warn("Error committing file strategy: " + processStrategy, e);
180 handleException(e);
181 }
182 }
183
184 protected boolean isValidFile(File file) {
185 boolean result = false;
186 if (file != null && file.exists()) {
187 // TODO: maybe use a configurable strategy instead of the hardcoded one based on last file change
188 if (isMatched(file) && isChanged(file)) {
189 result = true;
190 }
191 }
192 return result;
193 }
194
195 protected boolean isChanged(File file) {
196 if (file == null) {
197 // Sanity check
198 return false;
199 } else if (file.isDirectory()) {
200 // Allow recursive polling to descend into this directory
201 return true;
202 } else {
203 boolean lastModifiedCheck = false;
204 long modifiedDuration = 0;
205 if (getUnchangedDelay() > 0) {
206 modifiedDuration = System.currentTimeMillis() - file.lastModified();
207 lastModifiedCheck = modifiedDuration >= getUnchangedDelay();
208 }
209
210 long fileModified = file.lastModified();
211 Long previousModified = noopMap.get(file);
212 noopMap.put(file, fileModified);
213 if (previousModified == null || fileModified > previousModified) {
214 lastModifiedCheck = true;
215 }
216
217 boolean sizeCheck = false;
218 long sizeDifference = 0;
219 if (isUnchangedSize()) {
220 Long value = fileSizes.get(file);
221 if (value == null) {
222 sizeCheck = true;
223 } else {
224 sizeCheck = file.length() != value;
225 }
226 }
227
228 boolean answer = lastModifiedCheck || sizeCheck;
229
230 if (LOG.isDebugEnabled()) {
231 LOG.debug("file:" + file + " isChanged:" + answer + " " + "sizeCheck:" + sizeCheck + "("
232 + sizeDifference + ") " + "lastModifiedCheck:" + lastModifiedCheck + "("
233 + modifiedDuration + ")");
234 }
235
236 if (isUnchangedSize()) {
237 if (answer) {
238 fileSizes.put(file, file.length());
239 } else {
240 fileSizes.remove(file);
241 }
242 }
243
244 return answer;
245 }
246 }
247
248 protected boolean isMatched(File file) {
249 String name = file.getName();
250 if (regexPattern != null && regexPattern.length() > 0) {
251 if (!name.matches(getRegexPattern())) {
252 return false;
253 }
254 }
255 String[] prefixes = endpoint.getExcludedNamePrefixes();
256 if (prefixes != null) {
257 for (String prefix : prefixes) {
258 if (name.startsWith(prefix)) {
259 return false;
260 }
261 }
262 }
263 String[] postfixes = endpoint.getExcludedNamePostfixes();
264 if (postfixes != null) {
265 for (String postfix : postfixes) {
266 if (name.endsWith(postfix)) {
267 return false;
268 }
269 }
270 }
271 return true;
272 }
273
274 public boolean isRecursive() {
275 return this.recursive;
276 }
277
278 public void setRecursive(boolean recursive) {
279 this.recursive = recursive;
280 }
281
282 public String getRegexPattern() {
283 return this.regexPattern;
284 }
285
286 public void setRegexPattern(String regexPattern) {
287 this.regexPattern = regexPattern;
288 }
289
290 public boolean isGenerateEmptyExchangeWhenIdle() {
291 return generateEmptyExchangeWhenIdle;
292 }
293
294 public void setGenerateEmptyExchangeWhenIdle(boolean generateEmptyExchangeWhenIdle) {
295 this.generateEmptyExchangeWhenIdle = generateEmptyExchangeWhenIdle;
296 }
297
298 public int getUnchangedDelay() {
299 return unchangedDelay;
300 }
301
302 public void setUnchangedDelay(int unchangedDelay) {
303 this.unchangedDelay = unchangedDelay;
304 }
305
306 public boolean isUnchangedSize() {
307 return unchangedSize;
308 }
309
310 public void setUnchangedSize(boolean unchangedSize) {
311 this.unchangedSize = unchangedSize;
312 }
313
314 }