001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor.idempotent;
018
019 import org.apache.camel.Exchange;
020 import org.apache.camel.spi.IdempotentRepository;
021 import org.apache.camel.spi.Synchronization;
022 import org.apache.camel.util.ExchangeHelper;
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026 /**
027 * On completion strategy for {@link org.apache.camel.processor.idempotent.IdempotentConsumer}.
028 * <p/>
029 * This strategy adds the message id to the idempotent repository in cast the exchange
030 * was processed successfully. In case of failure the message id is <b>not</b> added.
031 *
032 * @version $Revision: 784697 $
033 */
034 public class IdempotentOnCompletion implements Synchronization {
035 private static final transient Log LOG = LogFactory.getLog(IdempotentOnCompletion.class);
036 private final IdempotentRepository idempotentRepository;
037 private final String messageId;
038 private final boolean eager;
039
040 public IdempotentOnCompletion(IdempotentRepository idempotentRepository, String messageId, boolean eager) {
041 this.idempotentRepository = idempotentRepository;
042 this.messageId = messageId;
043 this.eager = eager;
044 }
045
046 public void onComplete(Exchange exchange) {
047 if (ExchangeHelper.isFailureHandled(exchange)) {
048 // the exchange did not process successfully but was failure handled by the dead letter channel
049 // and thus moved to the dead letter queue. We should thus not consider it as complete.
050 onFailedMessage(exchange, messageId);
051 } else {
052 onCompletedMessage(exchange, messageId);
053 }
054 }
055
056 public void onFailure(Exchange exchange) {
057 onFailedMessage(exchange, messageId);
058 }
059
060 /**
061 * A strategy method to allow derived classes to overload the behavior of
062 * processing a completed message
063 *
064 * @param exchange the exchange
065 * @param messageId the message ID of this exchange
066 */
067 @SuppressWarnings("unchecked")
068 protected void onCompletedMessage(Exchange exchange, String messageId) {
069 if (!eager) {
070 // if not eager we should add the key when its complete
071 idempotentRepository.add(messageId);
072 }
073 idempotentRepository.confirm(messageId);
074 }
075
076 /**
077 * A strategy method to allow derived classes to overload the behavior of
078 * processing a failed message
079 *
080 * @param exchange the exchange
081 * @param messageId the message ID of this exchange
082 */
083 @SuppressWarnings("unchecked")
084 protected void onFailedMessage(Exchange exchange, String messageId) {
085 idempotentRepository.remove(messageId);
086 if (LOG.isDebugEnabled()) {
087 LOG.debug("Removed from repository as exchange failed: " + exchange + " with id: " + messageId);
088 }
089 }
090
091 @Override
092 public String toString() {
093 return "IdempotentOnCompletion[" + messageId + ']';
094 }
095
096 }