001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019
020 import org.apache.camel.Endpoint;
021 import org.apache.camel.Exchange;
022 import org.apache.camel.ExchangePattern;
023 import org.apache.camel.Message;
024 import org.apache.camel.Processor;
025 import org.apache.camel.Producer;
026 import org.apache.camel.impl.ProducerCache;
027 import org.apache.camel.impl.ServiceSupport;
028 import org.apache.camel.model.RoutingSlipType;
029 import org.apache.camel.util.CollectionStringBuffer;
030 import org.apache.camel.util.ExchangeHelper;
031 import org.apache.commons.logging.Log;
032 import org.apache.commons.logging.LogFactory;
033
034 import static org.apache.camel.util.ObjectHelper.notNull;
035
036 /**
037 * Implements a <a href="http://activemq.apache.org/camel/routing-slip.html">Routing Slip</a>
038 * pattern where the list of actual endpoints to send a message exchange to are
039 * dependent on the value of a message header.
040 */
041 public class RoutingSlip extends ServiceSupport implements Processor {
042 private static final transient Log LOG = LogFactory.getLog(RoutingSlip.class);
043 private final String header;
044 private final String uriDelimiter;
045
046 private ProducerCache<Exchange> producerCache = new ProducerCache<Exchange>();
047
048 public RoutingSlip(String header) {
049 this(header, RoutingSlipType.DEFAULT_DELIMITER);
050 }
051
052 public RoutingSlip(String header, String uriDelimiter) {
053 notNull(header, "header");
054 notNull(uriDelimiter, "uriDelimiter");
055
056 this.header = header;
057 this.uriDelimiter = uriDelimiter;
058 }
059
060 @Override
061 public String toString() {
062 return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]";
063 }
064
065 public void process(Exchange exchange) throws Exception {
066 Message message = exchange.getIn();
067 String[] recipients = recipients(message);
068 Exchange current = exchange;
069
070 for (String nextRecipient : recipients) {
071 Endpoint<Exchange> endpoint = resolveEndpoint(exchange, nextRecipient);
072 Producer<Exchange> producer = producerCache.getProducer(endpoint);
073 Exchange ex = current.newInstance();
074
075 updateRoutingSlip(current);
076 copyOutToIn(ex, current);
077
078 producer.process(ex);
079
080 current = ex;
081 }
082 ExchangeHelper.copyResults(exchange, current);
083 }
084
085 protected Endpoint<Exchange> resolveEndpoint(Exchange exchange, Object recipient) {
086 return ExchangeHelper.resolveEndpoint(exchange, recipient);
087 }
088
089 protected void doStop() throws Exception {
090 producerCache.stop();
091 }
092
093 protected void doStart() throws Exception {
094 }
095
096 private void updateRoutingSlip(Exchange current) {
097 Message message = getResultMessage(current);
098 message.setHeader(header, removeFirstElement(recipients(message)));
099 }
100
101 /**
102 * Returns the outbound message if available. Otherwise return the inbound
103 * message.
104 */
105 private Message getResultMessage(Exchange exchange) {
106 Message message = exchange.getOut(false);
107 // if this endpoint had no out (like a mock endpoint)
108 // just take the in
109 if (message == null) {
110 message = exchange.getIn();
111 }
112 return message;
113 }
114
115 /**
116 * Return the list of recipients defined in the routing slip in the
117 * specified message.
118 */
119 private String[] recipients(Message message) {
120 Object headerValue = message.getHeader(header);
121 if (headerValue != null && !headerValue.equals("")) {
122 return headerValue.toString().split(uriDelimiter);
123 }
124 return new String[] {};
125 }
126
127 /**
128 * Return a string representation of the element list with the first element
129 * removed.
130 */
131 private String removeFirstElement(String[] elements) {
132 CollectionStringBuffer updatedElements = new CollectionStringBuffer(uriDelimiter);
133 for (int i = 1; i < elements.length; i++) {
134 updatedElements.append(elements[i]);
135 }
136 return updatedElements.toString();
137 }
138
139 /**
140 * Copy the outbound data in 'source' to the inbound data in 'result'.
141 */
142 private void copyOutToIn(Exchange result, Exchange source) {
143 result.setException(source.getException());
144
145 Message fault = source.getFault(false);
146 if (fault != null) {
147 result.getFault(true).copyFrom(fault);
148 }
149
150 result.setIn(getResultMessage(source));
151
152 result.getProperties().clear();
153 result.getProperties().putAll(source.getProperties());
154 }
155 }