001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import org.apache.camel.Endpoint;
020 import org.apache.camel.Exchange;
021 import org.apache.camel.ExchangePattern;
022 import org.apache.camel.Message;
023 import org.apache.camel.Processor;
024 import org.apache.camel.Producer;
025 import org.apache.camel.ProducerCallback;
026 import org.apache.camel.impl.ProducerCache;
027 import org.apache.camel.impl.ServiceSupport;
028 import org.apache.camel.model.RoutingSlipDefinition;
029 import org.apache.camel.util.ExchangeHelper;
030
031 import static org.apache.camel.util.ObjectHelper.notNull;
032
033 /**
034 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a>
035 * pattern where the list of actual endpoints to send a message exchange to are
036 * dependent on the value of a message header.
037 */
038 public class RoutingSlip extends ServiceSupport implements Processor, Traceable {
039 private ProducerCache producerCache;
040 private final String header;
041 private final String uriDelimiter;
042
043 public RoutingSlip(String header) {
044 this(header, RoutingSlipDefinition.DEFAULT_DELIMITER);
045 }
046
047 public RoutingSlip(String header, String uriDelimiter) {
048 notNull(header, "header");
049 notNull(uriDelimiter, "uriDelimiter");
050
051 this.header = header;
052 this.uriDelimiter = uriDelimiter;
053 }
054
055 @Override
056 public String toString() {
057 return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]";
058 }
059
060 public String getTraceLabel() {
061 return "RoutingSlip[" + header + "]";
062 }
063
064 public void process(Exchange exchange) throws Exception {
065 Message message = exchange.getIn();
066 String[] recipients = recipients(message);
067 Exchange current = exchange;
068
069 for (String nextRecipient : recipients) {
070 Endpoint endpoint = resolveEndpoint(exchange, nextRecipient);
071
072 Exchange copy = current.newInstance();
073 updateRoutingSlip(current);
074 copyOutToIn(copy, current);
075
076 getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
077 public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception {
078 producer.process(exchange);
079 return exchange;
080 }
081 });
082
083 current = copy;
084 }
085 ExchangeHelper.copyResults(exchange, current);
086 }
087
088 protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
089 // setup producer cache as we need to use the pluggable service pool defined on camel context
090 if (producerCache == null) {
091 this.producerCache = new ProducerCache(exchange.getContext().getProducerServicePool());
092 this.producerCache.start();
093 }
094 return this.producerCache;
095 }
096
097 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
098 return ExchangeHelper.resolveEndpoint(exchange, recipient);
099 }
100
101 protected void doStart() throws Exception {
102 if (producerCache != null) {
103 producerCache.start();
104 }
105 }
106
107 protected void doStop() throws Exception {
108 if (producerCache != null) {
109 producerCache.stop();
110 }
111 }
112
113 private void updateRoutingSlip(Exchange current) {
114 Message message = getResultMessage(current);
115 String oldSlip = message.getHeader(header, String.class);
116 if (oldSlip != null) {
117 int delimiterIndex = oldSlip.indexOf(uriDelimiter);
118 String newSlip = delimiterIndex > 0 ? oldSlip.substring(delimiterIndex + 1) : "";
119 message.setHeader(header, newSlip);
120 }
121 }
122
123 /**
124 * Returns the outbound message if available. Otherwise return the inbound
125 * message.
126 */
127 private Message getResultMessage(Exchange exchange) {
128 if (exchange.hasOut()) {
129 return exchange.getOut();
130 } else {
131 // if this endpoint had no out (like a mock endpoint) just take the in
132 return exchange.getIn();
133 }
134 }
135
136 /**
137 * Return the list of recipients defined in the routing slip in the
138 * specified message.
139 */
140 private String[] recipients(Message message) {
141 Object headerValue = message.getHeader(header);
142 if (headerValue != null && !headerValue.equals("")) {
143 return headerValue.toString().split(uriDelimiter);
144 }
145 return new String[] {};
146 }
147
148 /**
149 * Copy the outbound data in 'source' to the inbound data in 'result'.
150 */
151 private void copyOutToIn(Exchange result, Exchange source) {
152 result.setException(source.getException());
153
154 if (source.hasFault()) {
155 result.getFault().copyFrom(source.getFault());
156 }
157
158 result.setIn(getResultMessage(source));
159
160 result.getProperties().clear();
161 result.getProperties().putAll(source.getProperties());
162 }
163 }