001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor.resequencer;
018
019 import java.util.Queue;
020 import java.util.Timer;
021
022 import org.apache.commons.logging.Log;
023 import org.apache.commons.logging.LogFactory;
024
025 /**
026 * Resequences elements based on a given {@link SequenceElementComparator}.
027 * This resequencer is designed for resequencing element streams. Resequenced
028 * elements are added to an output {@link Queue}. The resequencer is configured
029 * via the <code>timeout</code> and <code>capacity</code> properties.
030 *
031 * <ul>
032 * <li><code>timeout</code>. Defines the timeout (in milliseconds) for a
033 * given element managed by this resequencer. An out-of-sequence element can
034 * only be marked as <i>ready-for-delivery</i> if it either times out or if it
035 * has an immediate predecessor (in that case it is in-sequence). If an
036 * immediate predecessor of a waiting element arrives the timeout task for the
037 * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
038 * <p>
039 * If the maximum out-of-sequence time between elements within a stream is
040 * known, the <code>timeout</code> value should be set to this value. In this
041 * case it is guaranteed that all elements of a stream will be delivered in
042 * sequence to the output queue. However, large <code>timeout</code> values
043 * might require a very high resequencer <code>capacity</code> which might be
044 * in conflict with available memory resources. The lower the
045 * <code>timeout</code> value is compared to the out-of-sequence time between
046 * elements within a stream the higher the probability is for out-of-sequence
047 * elements delivered by this resequencer.</li>
048 * <li><code>capacity</code>. The capacity of this resequencer.</li>
049 * </ul>
050 *
051 * Whenever a timeout for a certain element occurs or an element has been added
052 * to this resequencer a delivery attempt is started. If a (sub)sequence of
053 * elements is <i>ready-for-delivery</i> then they are added to output queue.
054 * <p>
055 * The resequencer remembers the last-delivered element. If an element arrives
056 * which is the immediate successor of the last-delivered element it will be
057 * delivered immediately and the last-delivered element is adjusted accordingly.
058 * If the last-delivered element is <code>null</code> i.e. the resequencer was
059 * newly created the first arriving element will wait <code>timeout</code>
060 * milliseconds for being delivered to the output queue.
061 *
062 * @author Martin Krasser
063 *
064 * @version $Revision
065 */
066 public class ResequencerEngine<E> implements TimeoutHandler {
067
068 private static final transient Log LOG = LogFactory.getLog(ResequencerEngine.class);
069
070 private long timeout;
071 private int capacity;
072 private Queue<E> outQueue;
073 private Element<E> lastDelivered;
074
075 /**
076 * A sequence of elements for sorting purposes.
077 */
078 private Sequence<Element<E>> sequence;
079
080 /**
081 * A timer for scheduling timeout notifications.
082 */
083 private Timer timer;
084
085 /**
086 * Creates a new resequencer instance with a default timeout of 2000
087 * milliseconds. The capacity is set to {@link Integer#MAX_VALUE}.
088 *
089 * @param comparator a sequence element comparator.
090 */
091 public ResequencerEngine(SequenceElementComparator<E> comparator) {
092 this(comparator, Integer.MAX_VALUE);
093 }
094
095 /**
096 * Creates a new resequencer instance with a default timeout of 2000
097 * milliseconds.
098 *
099 * @param comparator a sequence element comparator.
100 * @param capacity the capacity of this resequencer.
101 */
102 public ResequencerEngine(SequenceElementComparator<E> comparator, int capacity) {
103 this.timer = new Timer("Resequencer Timer");
104 this.sequence = createSequence(comparator);
105 this.capacity = capacity;
106 this.timeout = 2000L;
107 this.lastDelivered = null;
108 }
109
110 /**
111 * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
112 */
113 public void stop() {
114 this.timer.cancel();
115 }
116
117 /**
118 * Returns the output queue.
119 *
120 * @return the output queue.
121 */
122 public Queue<E> getOutQueue() {
123 return outQueue;
124 }
125
126 /**
127 * Sets the output queue.
128 *
129 * @param outQueue output queue.
130 */
131 public void setOutQueue(Queue<E> outQueue) {
132 this.outQueue = outQueue;
133 }
134
135 /**
136 * Returns this resequencer's timeout value.
137 *
138 * @return the timeout in milliseconds.
139 */
140 public long getTimeout() {
141 return timeout;
142 }
143
144 /**
145 * Sets this sequencer's timeout value.
146 *
147 * @param timeout the timeout in milliseconds.
148 */
149 public void setTimeout(long timeout) {
150 this.timeout = timeout;
151 }
152
153 /**
154 * Handles a timeout notification by starting a delivery attempt.
155 *
156 * @param timout timeout task that caused the notification.
157 */
158 public synchronized void timeout(Timeout timout) {
159 try {
160 while (deliver()) {
161 // work done in deliver()
162 }
163 } catch (RuntimeException e) {
164 LOG.error("error during delivery", e);
165 }
166 }
167
168 /**
169 * Adds an element to this resequencer throwing an exception if the maximum
170 * capacity is reached.
171 *
172 * @param o element to be resequenced.
173 * @throws IllegalStateException if the element cannot be added at this time
174 * due to capacity restrictions.
175 */
176 public synchronized void add(E o) {
177 if (sequence.size() >= capacity) {
178 throw new IllegalStateException("maximum capacity is reached");
179 }
180 insert(o);
181 }
182
183 /**
184 * Adds an element to this resequencer waiting, if necessary, until capacity
185 * becomes available.
186 *
187 * @param o element to be resequenced.
188 * @throws InterruptedException if interrupted while waiting.
189 */
190 public synchronized void put(E o) throws InterruptedException {
191 if (sequence.size() >= capacity) {
192 wait();
193 }
194 insert(o);
195 }
196
197 /**
198 * Returns the last delivered element.
199 *
200 * @return the last delivered element or <code>null</code> if no delivery
201 * has been made yet.
202 */
203 E getLastDelivered() {
204 if (lastDelivered == null) {
205 return null;
206 }
207 return lastDelivered.getObject();
208 }
209
210 /**
211 * Sets the last delivered element. This is for testing purposes only.
212 *
213 * @param o an element.
214 */
215 void setLastDelivered(E o) {
216 lastDelivered = new Element<E>(o);
217 }
218
219 /**
220 * Inserts the given element into this resequencing queue (sequence). If the
221 * element is not ready for immediate delivery and has no immediate
222 * presecessor then it is scheduled for timing out. After being timed out it
223 * is ready for delivery.
224 *
225 * @param o an element.
226 */
227 private void insert(E o) {
228 // wrap object into internal element
229 Element<E> element = new Element<E>(o);
230 // add element to sequence in proper order
231 sequence.add(element);
232
233 Element<E> successor = sequence.successor(element);
234
235 // check if there is an immediate successor and cancel
236 // timer task (no need to wait any more for timeout)
237 if (successor != null) {
238 successor.cancel();
239 }
240
241 // start delivery if current element is successor of last delivered element
242 if (successorOfLastDelivered(element)) {
243 // nothing to schedule
244 } else if (sequence.predecessor(element) != null) {
245 // nothing to schedule
246 } else {
247 Timeout t = defineTimeout();
248 element.schedule(t);
249 }
250
251 // start delivery
252 while (deliver()) {
253 // work done in deliver()
254 }
255 }
256
257 /**
258 * Attempts to deliver a single element from the head of the resequencer
259 * queue (sequence). Only elements which have not been scheduled for timing
260 * out or which already timed out can be delivered.
261 *
262 * @return <code>true</code> if the element has been delivered
263 * <code>false</code> otherwise.
264 */
265 private boolean deliver() {
266 if (sequence.size() == 0) {
267 return false;
268 }
269 // inspect element with lowest sequence value
270 Element<E> element = sequence.first();
271
272 // if element is scheduled do not deliver and return
273 if (element.scheduled()) {
274 return false;
275 }
276
277 // remove deliverable element from sequence
278 sequence.remove(element);
279
280 // set the delivered element to last delivered element
281 lastDelivered = element;
282
283 // notify a waiting thread that capacity is available
284 notify();
285
286 // add element to output queue
287 outQueue.add(element.getObject());
288
289 // element has been delivered
290 return true;
291 }
292
293 /**
294 * Returns <code>true</code> if the given element is the immediate
295 * successor of the last delivered element.
296 *
297 * @param element an element.
298 * @return <code>true</code> if the given element is the immediate
299 * successor of the last delivered element.
300 */
301 private boolean successorOfLastDelivered(Element<E> element) {
302 if (lastDelivered == null) {
303 return false;
304 }
305 if (sequence.comparator().successor(element, lastDelivered)) {
306 return true;
307 }
308 return false;
309 }
310
311 /**
312 * Creates a timeout task based on the timeout setting of this resequencer.
313 *
314 * @return a new timeout task.
315 */
316 private Timeout defineTimeout() {
317 Timeout result = new Timeout(timer, timeout);
318 result.addTimeoutHandler(this);
319 return result;
320 }
321
322 private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
323 return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
324 }
325
326 }