001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor.idempotent;
018
019 import java.util.ArrayList;
020 import java.util.List;
021
022 import org.apache.camel.Exchange;
023 import org.apache.camel.Expression;
024 import org.apache.camel.Navigate;
025 import org.apache.camel.Processor;
026 import org.apache.camel.impl.ServiceSupport;
027 import org.apache.camel.spi.IdempotentRepository;
028 import org.apache.camel.util.ServiceHelper;
029 import org.apache.commons.logging.Log;
030 import org.apache.commons.logging.LogFactory;
031
032 /**
033 * An implementation of the <a
034 * href="http://camel.apache.org/idempotent-consumer.html">Idempotent Consumer</a> pattern.
035 *
036 * @version $Revision: 782534 $
037 */
038 public class IdempotentConsumer extends ServiceSupport implements Processor, Navigate<Processor> {
039 private static final transient Log LOG = LogFactory.getLog(IdempotentConsumer.class);
040 private final Expression messageIdExpression;
041 private final Processor processor;
042 private final IdempotentRepository idempotentRepository;
043 private final boolean eager;
044
045 public IdempotentConsumer(Expression messageIdExpression, IdempotentRepository idempotentRepository,
046 boolean eager, Processor processor) {
047 this.messageIdExpression = messageIdExpression;
048 this.idempotentRepository = idempotentRepository;
049 this.eager = eager;
050 this.processor = processor;
051 }
052
053 @Override
054 public String toString() {
055 return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]";
056 }
057
058 @SuppressWarnings("unchecked")
059 public void process(Exchange exchange) throws Exception {
060 final String messageId = messageIdExpression.evaluate(exchange, String.class);
061 if (messageId == null) {
062 throw new NoMessageIdException(exchange, messageIdExpression);
063 }
064
065 boolean newKey;
066 if (eager) {
067 // add the key to the repository
068 newKey = idempotentRepository.add(messageId);
069 } else {
070 // check if we alrady have the key
071 newKey = !idempotentRepository.contains(messageId);
072 }
073
074 if (!newKey) {
075 // we already have this key so its a duplicate message
076 onDuplicateMessage(exchange, messageId);
077 return;
078 }
079
080 // register our on completion callback
081 exchange.addOnCompletion(new IdempotentOnCompletion(idempotentRepository, messageId, eager));
082
083 // process the exchange
084 processor.process(exchange);
085 }
086
087 public List<Processor> next() {
088 if (!hasNext()) {
089 return null;
090 }
091 List<Processor> answer = new ArrayList<Processor>(1);
092 answer.add(processor);
093 return answer;
094 }
095
096 public boolean hasNext() {
097 return processor != null;
098 }
099
100 // Properties
101 // -------------------------------------------------------------------------
102 public Expression getMessageIdExpression() {
103 return messageIdExpression;
104 }
105
106 public IdempotentRepository getIdempotentRepository() {
107 return idempotentRepository;
108 }
109
110 public Processor getProcessor() {
111 return processor;
112 }
113
114 // Implementation methods
115 // -------------------------------------------------------------------------
116
117 protected void doStart() throws Exception {
118 ServiceHelper.startServices(processor);
119 }
120
121 protected void doStop() throws Exception {
122 ServiceHelper.stopServices(processor);
123 }
124
125 /**
126 * A strategy method to allow derived classes to overload the behaviour of
127 * processing a duplicate message
128 *
129 * @param exchange the exchange
130 * @param messageId the message ID of this exchange
131 */
132 protected void onDuplicateMessage(Exchange exchange, String messageId) {
133 if (LOG.isDebugEnabled()) {
134 LOG.debug("Ignoring duplicate message with id: " + messageId + " for exchange: " + exchange);
135 }
136 }
137
138 }