001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.concurrent.Callable;
020 import java.util.concurrent.ExecutorService;
021
022 import org.apache.camel.Endpoint;
023 import org.apache.camel.Exchange;
024 import org.apache.camel.ExchangePattern;
025 import org.apache.camel.Expression;
026 import org.apache.camel.Processor;
027 import org.apache.camel.Producer;
028 import org.apache.camel.ProducerCallback;
029 import org.apache.camel.impl.DefaultExchange;
030 import org.apache.camel.util.ObjectHelper;
031 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
032
033 /**
034 * Processor for wire tapping exchanges to an endpoint destination.
035 *
036 * @version $Revision: 792977 $
037 */
038 public class WireTapProcessor extends SendProcessor {
039
040 private ExecutorService executorService;
041
042 // expression or processor used for populating a new exchange to send
043 // as opposed to traditional wiretap that sends a copy of the original exchange
044 private Expression newExchangeExpression;
045 private Processor newExchangeProcessor;
046
047 public WireTapProcessor(Endpoint destination) {
048 super(destination);
049 }
050
051 public WireTapProcessor(Endpoint destination, ExchangePattern pattern) {
052 super(destination, pattern);
053 }
054
055 @Override
056 protected void doStart() throws Exception {
057 super.doStart();
058 }
059
060 @Override
061 protected void doStop() throws Exception {
062 if (executorService != null) {
063 executorService.shutdown();
064 }
065 super.doStop();
066 }
067
068 @Override
069 public String toString() {
070 return "WireTap[" + destination.getEndpointUri() + "]";
071 }
072
073 public void process(Exchange exchange) throws Exception {
074 getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
075 public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
076 Exchange wireTapExchange = configureExchange(exchange, pattern);
077 procesWireTap(producer, wireTapExchange);
078 return wireTapExchange;
079 }
080 });
081 }
082
083 /**
084 * Wiretaps the exchange.
085 *
086 * @param exchange the exchange to wire tap
087 */
088 protected void procesWireTap(final Producer producer, final Exchange exchange) {
089 // use submit instead of execute to force it to use a new thread, execute might
090 // decide to use current thread, so we must submit a new task
091 // as we dont care for the response we dont hold the future object and wait for the result
092 getExecutorService().submit(new Callable<Exchange>() {
093 public Exchange call() throws Exception {
094 if (LOG.isDebugEnabled()) {
095 LOG.debug("Processing wiretap: " + exchange);
096 }
097 producer.process(exchange);
098 return exchange;
099 }
100 });
101 }
102
103 @Override
104 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
105 if (newExchangeProcessor == null && newExchangeExpression == null) {
106 // use a copy of the original exchange
107 return configureCopyExchange(exchange);
108 } else {
109 // use a new exchange
110 return configureNewExchange(exchange);
111 }
112 }
113
114 private Exchange configureCopyExchange(Exchange exchange) {
115 // must use a copy as we dont want it to cause side effects of the original exchange
116 Exchange copy = exchange.copy(false);
117 // set MEP to InOnly as this wire tap is a fire and forget
118 copy.setPattern(ExchangePattern.InOnly);
119 return copy;
120 }
121
122 private Exchange configureNewExchange(Exchange exchange) {
123 Exchange answer = new DefaultExchange(exchange.getContext(), ExchangePattern.InOnly);
124 // use destination os origin of this new exchange
125 answer.setFromEndpoint(getDestination());
126
127 // prepare the exchange
128 if (newExchangeProcessor != null) {
129 try {
130 newExchangeProcessor.process(answer);
131 } catch (Exception e) {
132 throw ObjectHelper.wrapRuntimeCamelException(e);
133 }
134 } else {
135 Object body = newExchangeExpression.evaluate(answer, Object.class);
136 if (body != null) {
137 answer.getIn().setBody(body);
138 }
139 }
140
141 return answer;
142 }
143
144 public ExecutorService getExecutorService() {
145 if (executorService == null) {
146 executorService = createExecutorService();
147 }
148 return executorService;
149 }
150
151 private ExecutorService createExecutorService() {
152 return ExecutorServiceHelper.newScheduledThreadPool(5, this.toString(), true);
153 }
154
155 public void setExecutorService(ExecutorService executorService) {
156 this.executorService = executorService;
157 }
158
159 public Processor getNewExchangeProcessor() {
160 return newExchangeProcessor;
161 }
162
163 public void setNewExchangeProcessor(Processor newExchangeProcessor) {
164 this.newExchangeProcessor = newExchangeProcessor;
165 }
166
167 public Expression getNewExchangeExpression() {
168 return newExchangeExpression;
169 }
170
171 public void setNewExchangeExpression(Expression newExchangeExpression) {
172 this.newExchangeExpression = newExchangeExpression;
173 }
174 }