001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.concurrent.Callable;
020 import java.util.concurrent.ExecutorService;
021
022 import org.apache.camel.Exchange;
023 import org.apache.camel.ExchangePattern;
024 import org.apache.camel.Predicate;
025 import org.apache.camel.Processor;
026 import org.apache.camel.impl.ServiceSupport;
027 import org.apache.camel.impl.SynchronizationAdapter;
028 import org.apache.camel.util.ServiceHelper;
029 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
030 import org.apache.commons.logging.Log;
031 import org.apache.commons.logging.LogFactory;
032
033 /**
034 * @version $Revision: 792977 $
035 */
036 public class OnCompletionProcessor extends ServiceSupport implements Processor, Traceable {
037
038 private static final transient Log LOG = LogFactory.getLog(OnCompletionProcessor.class);
039 private ExecutorService executorService;
040 private Processor processor;
041 private boolean onCompleteOnly;
042 private boolean onFailureOnly;
043 private Predicate onWhen;
044
045 public OnCompletionProcessor(Processor processor, boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen) {
046 this.processor = processor;
047 this.onCompleteOnly = onCompleteOnly;
048 this.onFailureOnly = onFailureOnly;
049 this.onWhen = onWhen;
050 }
051
052 protected void doStart() throws Exception {
053 ServiceHelper.startService(processor);
054 }
055
056 @Override
057 protected void doStop() throws Exception {
058 if (executorService != null) {
059 executorService.shutdown();
060 }
061 ServiceHelper.stopService(processor);
062 }
063
064 public void process(Exchange exchange) throws Exception {
065 if (processor == null) {
066 return;
067 }
068
069 // register callback
070 exchange.getUnitOfWork().addSynchronization(new SynchronizationAdapter() {
071 @Override
072 public void onComplete(Exchange exchange) {
073 if (onFailureOnly) {
074 return;
075 }
076
077 if (onWhen != null && !onWhen.matches(exchange)) {
078 // predicate did not match so do not route the onComplete
079 return;
080 }
081
082 // must use a copy as we dont want it to cause side effects of the original exchange
083 final Exchange copy = prepareExchange(exchange);
084
085 getExecutorService().submit(new Callable<Exchange>() {
086 public Exchange call() throws Exception {
087 if (LOG.isDebugEnabled()) {
088 LOG.debug("Processing onComplete: " + copy);
089 }
090 processor.process(copy);
091 return copy;
092 }
093 });
094 }
095
096 public void onFailure(Exchange exchange) {
097 if (onCompleteOnly) {
098 return;
099 }
100
101 if (onWhen != null && !onWhen.matches(exchange)) {
102 // predicate did not match so do not route the onComplete
103 return;
104 }
105
106 // must use a copy as we dont want it to cause side effects of the original exchange
107 final Exchange copy = prepareExchange(exchange);
108 // must remove exception otherwise onFaulure routing will fail as well
109 // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange
110 copy.setException(null);
111
112 getExecutorService().submit(new Callable<Exchange>() {
113 public Exchange call() throws Exception {
114 if (LOG.isDebugEnabled()) {
115 LOG.debug("Processing onFailure: " + copy);
116 }
117
118 processor.process(copy);
119 return copy;
120 }
121 });
122 }
123
124 @Override
125 public String toString() {
126 if (!onCompleteOnly && !onFailureOnly) {
127 return "onCompleteOrFailure";
128 } else if (onCompleteOnly) {
129 return "onCompleteOnly";
130 } else {
131 return "onFailureOnly";
132 }
133 }
134 });
135 }
136
137 /**
138 * Prepares the {@link Exchange} to send as onCompletion.
139 *
140 * @param exchange the current exchange
141 * @return the exchange to be routed in onComplete
142 */
143 protected Exchange prepareExchange(Exchange exchange) {
144 // must use a copy as we dont want it to cause side effects of the original exchange
145 final Exchange copy = exchange.copy(false);
146 // set MEP to InOnly as this wire tap is a fire and forget
147 copy.setPattern(ExchangePattern.InOnly);
148 // add a header flag to indicate its a on completion exchange
149 copy.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE);
150 return copy;
151 }
152
153 public ExecutorService getExecutorService() {
154 if (executorService == null) {
155 executorService = createExecutorService();
156 }
157 return executorService;
158 }
159
160 private ExecutorService createExecutorService() {
161 return ExecutorServiceHelper.newScheduledThreadPool(5, this.toString(), true);
162 }
163
164 public void setExecutorService(ExecutorService executorService) {
165 this.executorService = executorService;
166 }
167
168 @Override
169 public String toString() {
170 return "OnCompletionProcessor[" + processor + "]";
171 }
172
173 public String getTraceLabel() {
174 return "OnCompletion";
175 }
176 }