001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019
020 import org.apache.camel.Endpoint;
021 import org.apache.camel.Exchange;
022 import org.apache.camel.ExchangePattern;
023 import org.apache.camel.Message;
024 import org.apache.camel.Processor;
025 import org.apache.camel.Producer;
026 import org.apache.camel.impl.ProducerCache;
027 import org.apache.camel.impl.ServiceSupport;
028 import org.apache.camel.model.RoutingSlipType;
029 import org.apache.camel.util.CollectionStringBuffer;
030 import org.apache.camel.util.ExchangeHelper;
031 import org.apache.commons.logging.Log;
032 import org.apache.commons.logging.LogFactory;
033
034 import static org.apache.camel.util.ObjectHelper.notNull;
035
036 /**
037 * Implements a <a
038 * href="http://activemq.apache.org/camel/routing-slip.html">Routing Slip</a>
039 * pattern where the list of actual endpoints to send a message exchange to are
040 * dependent on the value of a message header.
041 */
042 public class RoutingSlip extends ServiceSupport implements Processor {
043 private static final transient Log LOG = LogFactory.getLog(RoutingSlip.class);
044 private final String header;
045 private final String uriDelimiter;
046
047 private ProducerCache<Exchange> producerCache = new ProducerCache<Exchange>();
048
049 public RoutingSlip(String header) {
050 this(header, RoutingSlipType.DEFAULT_DELIMITER);
051 }
052
053 public RoutingSlip(String header, String uriDelimiter) {
054 notNull(header, "header");
055 notNull(uriDelimiter, "uriDelimiter");
056
057 this.header = header;
058 this.uriDelimiter = uriDelimiter;
059 }
060
061 @Override
062 public String toString() {
063 return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]";
064 }
065
066 public void process(Exchange exchange) throws Exception {
067 Message message = exchange.getIn();
068 String[] recipients = recipients(message);
069 Exchange current = exchange;
070
071 for (String nextRecipient : recipients) {
072 Endpoint<Exchange> endpoint = resolveEndpoint(exchange, nextRecipient);
073 Producer<Exchange> producer = producerCache.getProducer(endpoint);
074 Exchange ex = endpoint.createExchange(ExchangePattern.InOut);
075
076 updateRoutingSlip(current);
077 copyOutToIn(ex, current);
078
079 producer.process(ex);
080
081 current = ex;
082 }
083 ExchangeHelper.copyResults(exchange, current);
084 }
085
086 protected Endpoint<Exchange> resolveEndpoint(Exchange exchange, Object recipient) {
087 return ExchangeHelper.resolveEndpoint(exchange, recipient);
088 }
089
090 protected void doStop() throws Exception {
091 producerCache.stop();
092 }
093
094 protected void doStart() throws Exception {
095 }
096
097 private void updateRoutingSlip(Exchange current) {
098 Message message = getResultMessage(current);
099 message.setHeader(header, removeFirstElement(recipients(message)));
100 }
101
102 /**
103 * Returns the outbound message if available. Otherwise return the inbound
104 * message.
105 */
106 private Message getResultMessage(Exchange exchange) {
107 Message message = exchange.getOut(false);
108 // if this endpoint had no out (like a mock endpoint)
109 // just take the in
110 if (message == null) {
111 message = exchange.getIn();
112 }
113 return message;
114 }
115
116 /**
117 * Return the list of recipients defined in the routing slip in the
118 * specified message.
119 */
120 private String[] recipients(Message message) {
121 Object headerValue = message.getHeader(header);
122 if (headerValue != null && !headerValue.equals("")) {
123 return headerValue.toString().split(uriDelimiter);
124 }
125 return new String[] {};
126 }
127
128 /**
129 * Return a string representation of the element list with the first element
130 * removed.
131 */
132 private String removeFirstElement(String[] elements) {
133 CollectionStringBuffer updatedElements = new CollectionStringBuffer(uriDelimiter);
134 for (int i = 1; i < elements.length; i++) {
135 updatedElements.append(elements[i]);
136 }
137 return updatedElements.toString();
138 }
139
140 /**
141 * Copy the outbound data in 'source' to the inbound data in 'result'.
142 */
143 private void copyOutToIn(Exchange result, Exchange source) {
144 result.setException(source.getException());
145
146 Message fault = source.getFault(false);
147 if (fault != null) {
148 result.getFault(true).copyFrom(fault);
149 }
150
151 result.setIn(getResultMessage(source));
152
153 result.getProperties().clear();
154 result.getProperties().putAll(source.getProperties());
155 }
156 }