001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.file;
018
019 import java.util.ArrayList;
020 import java.util.Collections;
021 import java.util.LinkedList;
022 import java.util.List;
023 import java.util.Queue;
024
025 import org.apache.camel.BatchConsumer;
026 import org.apache.camel.Exchange;
027 import org.apache.camel.Processor;
028 import org.apache.camel.impl.DefaultExchange;
029 import org.apache.camel.impl.ScheduledPollConsumer;
030 import org.apache.camel.util.ObjectHelper;
031 import org.apache.commons.logging.Log;
032 import org.apache.commons.logging.LogFactory;
033
034 /**
035 * Base class for remote file consumers.
036 */
037 public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer {
038 protected final transient Log log = LogFactory.getLog(getClass());
039 protected GenericFileEndpoint<T> endpoint;
040 protected GenericFileOperations<T> operations;
041 protected boolean loggedIn;
042 protected String fileExpressionResult;
043 protected int maxMessagesPerPoll;
044
045 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
046 super(endpoint, processor);
047 this.endpoint = endpoint;
048 this.operations = operations;
049 }
050
051 /**
052 * Poll for files
053 */
054 protected void poll() throws Exception {
055 // must reset for each poll
056 fileExpressionResult = null;
057
058 // before we poll is there anything we need to check ? Such as are we
059 // connected to the FTP Server Still ?
060 if (!prePollCheck()) {
061 log.debug("Skipping pool as pre poll check returned false");
062 }
063
064 // gather list of files to process
065 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
066
067 String name = endpoint.getConfiguration().getDirectory();
068 pollDirectory(name, files);
069
070 // sort files using file comparator if provided
071 if (endpoint.getSorter() != null) {
072 Collections.sort(files, endpoint.getSorter());
073 }
074
075 // sort using build in sorters so we can use expressions
076 LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
077 for (GenericFile<T> file : files) {
078 Exchange exchange = endpoint.createExchange(file);
079 endpoint.configureMessage(file, exchange.getIn());
080 exchanges.add(exchange);
081 }
082 // sort files using exchange comparator if provided
083 if (endpoint.getSortBy() != null) {
084 Collections.sort(exchanges, endpoint.getSortBy());
085 }
086
087 // consume files one by one
088 int total = exchanges.size();
089 if (total > 0 && log.isDebugEnabled()) {
090 log.debug("Total " + total + " files to consume");
091 }
092
093 processBatch(exchanges);
094 }
095
096 public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
097 this.maxMessagesPerPoll = maxMessagesPerPoll;
098 }
099
100 @SuppressWarnings("unchecked")
101 public void processBatch(Queue exchanges) {
102 int total = exchanges.size();
103
104 // limit if needed
105 if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
106 log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll.");
107 total = maxMessagesPerPoll;
108 }
109
110 for (int index = 0; index < total && isRunAllowed(); index++) {
111 // only loop if we are started (allowed to run)
112 // use poll to remove the head so it does not consume memory even after we have processed it
113 Exchange exchange = (Exchange) exchanges.poll();
114 // add current index and total as properties
115 exchange.setProperty(Exchange.BATCH_INDEX, index);
116 exchange.setProperty(Exchange.BATCH_SIZE, total);
117 exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
118
119 // process the current exchange
120 processExchange(exchange);
121 }
122
123 // remove the file from the in progress list in case the batch was limited by max messages per poll
124 while (exchanges.size() > 0) {
125 Exchange exchange = (Exchange) exchanges.poll();
126 GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
127 String key = file.getFileName();
128 endpoint.getInProgressRepository().remove(key);
129 }
130 }
131
132 /**
133 * Override if required. Perform some checks (and perhaps actions) before we
134 * poll.
135 *
136 * @return true to poll, false to skip this poll.
137 */
138 protected boolean prePollCheck() throws Exception {
139 return true;
140 }
141
142 /**
143 * Polls the given directory for files to process
144 *
145 * @param fileName current directory or file
146 * @param fileList current list of files gathered
147 */
148 protected abstract void pollDirectory(String fileName, List<GenericFile<T>> fileList);
149
150 /**
151 * Processes the exchange
152 *
153 * @param exchange the exchange
154 */
155 protected void processExchange(final Exchange exchange) {
156 GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
157 if (log.isTraceEnabled()) {
158 log.trace("Processing remote file: " + file);
159 }
160
161 try {
162 final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
163
164 boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
165 if (!begin) {
166 log.debug(endpoint + " cannot begin processing file: " + file);
167 // remove file from the in progress list as its no longer in progress
168 endpoint.getInProgressRepository().remove(file.getFileName());
169 return;
170 }
171
172 // must use file from exchange as it can be updated due the
173 // preMoveNamePrefix/preMoveNamePostfix options
174 final GenericFile<T> target = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
175 // must use full name when downloading so we have the correct path
176 final String name = target.getAbsoluteFilePath();
177
178 // retrieve the file using the stream
179 if (log.isTraceEnabled()) {
180 log.trace("Retreiving file: " + name + " from: " + endpoint);
181 }
182
183 operations.retrieveFile(name, exchange);
184
185 if (log.isTraceEnabled()) {
186 log.trace("Retrieved file: " + name + " from: " + endpoint);
187 }
188
189 if (log.isDebugEnabled()) {
190 log.debug("About to process file: " + target + " using exchange: " + exchange);
191 }
192
193 // register on completion callback that does the completiom stategies
194 // (for instance to move the file after we have processed it)
195 exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations));
196
197 // process the exchange
198 getProcessor().process(exchange);
199
200 } catch (Exception e) {
201 handleException(e);
202 }
203 }
204
205 /**
206 * Strategy for validating if the given remote file should be included or
207 * not
208 *
209 * @param file the remote file
210 * @param isDirectory wether the file is a directory or a file
211 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
212 */
213 @SuppressWarnings("unchecked")
214 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
215 if (!isMatched(file, isDirectory)) {
216 if (log.isTraceEnabled()) {
217 log.trace("File did not match. Will skip this file: " + file);
218 }
219 return false;
220 } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) {
221 // only use the filename as the key as the file could be moved into a done folder
222 if (log.isTraceEnabled()) {
223 log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file);
224 }
225 return false;
226 }
227
228 // file matched
229 return true;
230 }
231
232 /**
233 * Strategy to perform file matching based on endpoint configuration.
234 * <p/>
235 * Will always return <tt>false</tt> for certain files/folders:
236 * <ul>
237 * <li>Starting with a dot</li>
238 * <li>lock files</li>
239 * </ul>
240 * And then <tt>true</tt> for directories.
241 *
242 * @param file the file
243 * @param isDirectory wether the file is a directory or a file
244 * @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not
245 */
246 protected boolean isMatched(GenericFile<T> file, boolean isDirectory) {
247 String name = file.getFileNameOnly();
248
249 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
250 if (name.startsWith(".")) {
251 return false;
252 }
253
254 // lock files should be skipped
255 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
256 return false;
257 }
258
259 // directories so far is always regarded as matched (matching on the name is only for files)
260 if (isDirectory) {
261 return true;
262 }
263
264 if (endpoint.getFilter() != null) {
265 if (!endpoint.getFilter().accept(file)) {
266 return false;
267 }
268 }
269
270 if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
271 if (name.matches(endpoint.getExclude())) {
272 return false;
273 }
274 }
275
276 if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
277 if (!name.matches(endpoint.getInclude())) {
278 return false;
279 }
280 }
281
282 // use file expression for a simple dynamic file filter
283 if (endpoint.getFileName() != null) {
284 evaluteFileExpression();
285 if (fileExpressionResult != null) {
286 if (!name.equals(fileExpressionResult)) {
287 return false;
288 }
289 }
290 }
291
292 return true;
293 }
294
295 /**
296 * Is the given file already in progress.
297 *
298 * @param file the file
299 * @return <tt>true</tt> if the file is already in progress
300 */
301 protected boolean isInProgress(GenericFile<T> file) {
302 String key = file.getFileName();
303 return !endpoint.getInProgressRepository().add(key);
304 }
305
306 private void evaluteFileExpression() {
307 if (fileExpressionResult == null) {
308 // create a dummy exchange as Exchange is needed for expression evaluation
309 Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
310 fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
311 }
312 }
313
314 }