001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import java.util.Collection;
020 import java.util.Iterator;
021 import java.util.List;
022
023 import org.apache.camel.AsyncCallback;
024 import org.apache.camel.AsyncProcessor;
025 import org.apache.camel.Exchange;
026 import org.apache.camel.Message;
027 import org.apache.camel.Processor;
028 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
029 import org.apache.camel.util.AsyncProcessorHelper;
030 import org.apache.camel.util.ExchangeHelper;
031 import org.apache.commons.logging.Log;
032 import org.apache.commons.logging.LogFactory;
033
034 /**
035 * Creates a Pipeline pattern where the output of the previous step is sent as
036 * input to the next step, reusing the same message exchanges
037 *
038 * @version $Revision: 676606 $
039 */
040 public class Pipeline extends MulticastProcessor implements AsyncProcessor {
041 private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
042
043 public Pipeline(Collection<Processor> processors) {
044 super(processors);
045 }
046
047 public static Processor newInstance(List<Processor> processors) {
048 if (processors.isEmpty()) {
049 return null;
050 } else if (processors.size() == 1) {
051 return processors.get(0);
052 }
053 return new Pipeline(processors);
054 }
055
056 public void process(Exchange exchange) throws Exception {
057 AsyncProcessorHelper.process(this, exchange);
058 }
059
060 public boolean process(Exchange original, AsyncCallback callback) {
061 Iterator<Processor> processors = getProcessors().iterator();
062 Exchange nextExchange = original;
063 boolean first = true;
064 while (true) {
065 if (nextExchange.isFailed()) {
066 if (LOG.isDebugEnabled()) {
067 LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange
068 + " exception: " + nextExchange.getException() + " fault: "
069 + nextExchange.getFault(false));
070 }
071 break;
072 }
073 if (!processors.hasNext()) {
074 break;
075 }
076
077 AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
078
079 if (first) {
080 first = false;
081 } else {
082 nextExchange = createNextExchange(processor, nextExchange);
083 }
084
085 boolean sync = process(original, nextExchange, callback, processors, processor);
086 // Continue processing the pipeline synchronously ...
087 if (!sync) {
088 // The pipeline will be completed async...
089 return false;
090 }
091 }
092
093 // If we get here then the pipeline was processed entirely
094 // synchronously.
095 ExchangeHelper.copyResults(original, nextExchange);
096 callback.done(true);
097 return true;
098 }
099
100 private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
101 return processor.process(exchange, new AsyncCallback() {
102 public void done(boolean sync) {
103
104 // We only have to handle async completion of
105 // the pipeline..
106 if (sync) {
107 return;
108 }
109
110 // Continue processing the pipeline...
111 Exchange nextExchange = exchange;
112 while (processors.hasNext()) {
113 AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
114
115 if (nextExchange.isFailed()) {
116 if (LOG.isDebugEnabled()) {
117 LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: "
118 + nextExchange.getFault(false));
119 }
120 break;
121 }
122
123 nextExchange = createNextExchange(processor, nextExchange);
124 sync = process(original, nextExchange, callback, processors, processor);
125 if (!sync) {
126 return;
127 }
128 }
129
130 ExchangeHelper.copyResults(original, nextExchange);
131 callback.done(false);
132 }
133 });
134 }
135
136 /**
137 * Strategy method to create the next exchange from the previous exchange.
138 *
139 * @param producer the producer used to send to the endpoint
140 * @param previousExchange the previous exchange
141 * @return a new exchange
142 */
143 protected Exchange createNextExchange(Processor producer, Exchange previousExchange) {
144 Exchange answer = previousExchange.newInstance();
145
146 answer.getProperties().putAll(previousExchange.getProperties());
147
148 // now lets set the input of the next exchange to the output of the
149 // previous message if it is not null
150 Message previousOut = previousExchange.getOut(false);
151 Message in = answer.getIn();
152 if (previousOut != null) {
153 in.copyFrom(previousOut);
154 } else {
155 in.copyFrom(previousExchange.getIn());
156 }
157 return answer;
158 }
159
160 @Override
161 public String toString() {
162 return "Pipeline" + getProcessors();
163 }
164 }