001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.Collection;
020 import java.util.Iterator;
021 import java.util.List;
022
023 import org.apache.camel.Exchange;
024 import org.apache.camel.Processor;
025 import org.apache.camel.util.ExchangeHelper;
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028
029 /**
030 * Creates a Pipeline pattern where the output of the previous step is sent as
031 * input to the next step, reusing the same message exchanges
032 *
033 * @version $Revision: 793373 $
034 */
035 public class Pipeline extends MulticastProcessor implements Processor, Traceable {
036 private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
037
038 public Pipeline(Collection<Processor> processors) {
039 super(processors);
040 }
041
042 public static Processor newInstance(List<Processor> processors) {
043 if (processors.isEmpty()) {
044 return null;
045 } else if (processors.size() == 1) {
046 return processors.get(0);
047 }
048 return new Pipeline(processors);
049 }
050
051 public void process(Exchange exchange) throws Exception {
052 Iterator<Processor> processors = getProcessors().iterator();
053 Exchange nextExchange = exchange;
054 boolean first = true;
055
056 while (continueRouting(processors, nextExchange)) {
057 if (first) {
058 first = false;
059 } else {
060 // prepare for next run
061 nextExchange = createNextExchange(nextExchange);
062 }
063
064 // get the next processor
065 Processor processor = processors.next();
066
067 // process the next exchange
068 try {
069 if (LOG.isTraceEnabled()) {
070 // this does the actual processing so log at trace level
071 LOG.trace("Processing exchangeId: " + nextExchange.getExchangeId() + " >>> " + nextExchange);
072 }
073 processor.process(nextExchange);
074 } catch (Exception e) {
075 nextExchange.setException(e);
076 }
077
078 // check for error if so we should break out
079 boolean exceptionHandled = hasExceptionBeenHandledByErrorHandler(nextExchange);
080 if (nextExchange.isFailed() || nextExchange.isRollbackOnly() || exceptionHandled) {
081 // The Exchange.ERRORHANDLED_HANDLED property is only set if satisfactory handling was done
082 // by the error handler. It's still an exception, the exchange still failed.
083 if (LOG.isDebugEnabled()) {
084 StringBuilder sb = new StringBuilder();
085 sb.append("Message exchange has failed so breaking out of pipeline: ").append(nextExchange);
086 if (nextExchange.isRollbackOnly()) {
087 sb.append(" Marked as rollback only.");
088 }
089 if (nextExchange.getException() != null) {
090 sb.append(" Exception: ").append(nextExchange.getException());
091 }
092 if (nextExchange.hasFault()) {
093 sb.append(" Fault: ").append(nextExchange.getFault());
094 }
095 if (exceptionHandled) {
096 sb.append(" Handled by the error handler.");
097 }
098 LOG.debug(sb.toString());
099 }
100 break;
101 }
102 }
103
104 if (LOG.isTraceEnabled()) {
105 // logging nextExchange as it contains the exchange that might have altered the payload and since
106 // we are logging the completion if will be confusing if we log the original instead
107 // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
108 LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
109 }
110
111 // copy results back to the original exchange
112 ExchangeHelper.copyResults(exchange, nextExchange);
113 }
114
115 private static boolean hasExceptionBeenHandledByErrorHandler(Exchange nextExchange) {
116 return Boolean.TRUE.equals(nextExchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
117 }
118
119 /**
120 * Strategy method to create the next exchange from the previous exchange.
121 * <p/>
122 * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
123 *
124 * @param previousExchange the previous exchange
125 * @return a new exchange
126 */
127 protected Exchange createNextExchange(Exchange previousExchange) {
128 Exchange answer = previousExchange.newInstance();
129 // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
130 // before processing the next step in the pipeline, so we have a snapshot of the exchange
131 // just before. This snapshot is used if Camel should do redeliveries (re try) using
132 // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
133 // exchange being routed.
134 answer.setExchangeId(previousExchange.getExchangeId());
135
136 answer.getProperties().putAll(previousExchange.getProperties());
137
138 // now lets set the input of the next exchange to the output of the
139 // previous message if it is not null
140 answer.setIn(previousExchange.hasOut()
141 ? previousExchange.getOut().copy() : previousExchange.getIn().copy());
142 return answer;
143 }
144
145 protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
146 Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
147 if (stop != null) {
148 boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
149 if (doStop) {
150 if (LOG.isDebugEnabled()) {
151 LOG.debug("Exchange is marked to stop routing: " + exchange);
152 }
153 return false;
154 }
155 }
156
157 // continue if there are more processors to route
158 return it.hasNext();
159 }
160
161 @Override
162 public String toString() {
163 return "Pipeline" + getProcessors();
164 }
165
166 @Override
167 public String getTraceLabel() {
168 return "Pipeline";
169 }
170 }