001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor.resequencer;
018
019 import java.util.Timer;
020
021 /**
022 * Resequences elements based on a given {@link SequenceElementComparator}.
023 * This resequencer is designed for resequencing element streams. Stream-based
024 * resequencing has the advantage that the number of elements to be resequenced
025 * need not be known in advance. Resequenced elements are delivered via a
026 * {@link SequenceSender}.
027 * <p>
028 * The resequencer's behaviour for a given comparator is controlled by the
029 * <code>timeout</code> property. This is the timeout (in milliseconds) for a
030 * given element managed by this resequencer. An out-of-sequence element can
031 * only be marked as <i>ready-for-delivery</i> if it either times out or if it
032 * has an immediate predecessor (in that case it is in-sequence). If an
033 * immediate predecessor of a waiting element arrives the timeout task for the
034 * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
035 * <p>
036 * If the maximum out-of-sequence time difference between elements within a
037 * stream is known, the <code>timeout</code> value should be set to this
038 * value. In this case it is guaranteed that all elements of a stream will be
039 * delivered in sequence via the {@link SequenceSender}. The lower the
040 * <code>timeout</code> value is compared to the out-of-sequence time
041 * difference between elements within a stream the higher the probability is for
042 * out-of-sequence elements delivered by this resequencer. Delivery of elements
043 * must be explicitly triggered by applications using the {@link #deliver()} or
044 * {@link #deliverNext()} methods. Only elements that are <i>ready-for-delivery</i>
045 * are delivered by these methods. The longer an application waits to trigger a
046 * delivery the more elements may become <i>ready-for-delivery</i>.
047 * <p>
048 * The resequencer remembers the last-delivered element. If an element arrives
049 * which is the immediate successor of the last-delivered element it is
050 * <i>ready-for-delivery</i> immediately. After delivery the last-delivered
051 * element is adjusted accordingly. If the last-delivered element is
052 * <code>null</code> i.e. the resequencer was newly created the first arriving
053 * element needs <code>timeout</code> milliseconds in any case for becoming
054 * <i>ready-for-delivery</i>.
055 * <p>
056 *
057 * @version $Revision: 792319 $
058 */
059 public class ResequencerEngine<E> {
060
061 /**
062 * The element that most recently hash been delivered or <code>null</code>
063 * if no element has been delivered yet.
064 */
065 private Element<E> lastDelivered;
066
067 /**
068 * Minimum amount of time to wait for out-of-sequence elements.
069 */
070 private long timeout;
071
072 /**
073 * A sequence of elements for sorting purposes.
074 */
075 private Sequence<Element<E>> sequence;
076
077 /**
078 * A timer for scheduling timeout notifications.
079 */
080 private Timer timer;
081
082 /**
083 * A strategy for sending sequence elements.
084 */
085 private SequenceSender<E> sequenceSender;
086
087 /**
088 * Creates a new resequencer instance with a default timeout of 2000
089 * milliseconds.
090 *
091 * @param comparator a sequence element comparator.
092 */
093 public ResequencerEngine(SequenceElementComparator<E> comparator) {
094 this.sequence = createSequence(comparator);
095 this.timeout = 2000L;
096 this.lastDelivered = null;
097 }
098
099 public void start() {
100 timer = new Timer("Camel Stream Resequencer Timer", true);
101 }
102
103 /**
104 * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
105 */
106 public void stop() {
107 timer.cancel();
108 }
109
110 /**
111 * Returns the number of elements currently maintained by this resequencer.
112 *
113 * @return the number of elements currently maintained by this resequencer.
114 */
115 public synchronized int size() {
116 return sequence.size();
117 }
118
119 /**
120 * Returns this resequencer's timeout value.
121 *
122 * @return the timeout in milliseconds.
123 */
124 public long getTimeout() {
125 return timeout;
126 }
127
128 /**
129 * Sets this sequencer's timeout value.
130 *
131 * @param timeout the timeout in milliseconds.
132 */
133 public void setTimeout(long timeout) {
134 this.timeout = timeout;
135 }
136
137 /**
138 * Returns the sequence sender.
139 *
140 * @return the sequence sender.
141 */
142 public SequenceSender<E> getSequenceSender() {
143 return sequenceSender;
144 }
145
146 /**
147 * Sets the sequence sender.
148 *
149 * @param sequenceSender a sequence element sender.
150 */
151 public void setSequenceSender(SequenceSender<E> sequenceSender) {
152 this.sequenceSender = sequenceSender;
153 }
154
155 /**
156 * Returns the last delivered element.
157 *
158 * @return the last delivered element or <code>null</code> if no delivery
159 * has been made yet.
160 */
161 E getLastDelivered() {
162 if (lastDelivered == null) {
163 return null;
164 }
165 return lastDelivered.getObject();
166 }
167
168 /**
169 * Sets the last delivered element. This is for testing purposes only.
170 *
171 * @param o an element.
172 */
173 void setLastDelivered(E o) {
174 lastDelivered = new Element<E>(o);
175 }
176
177 /**
178 * Inserts the given element into this resequencer. If the element is not
179 * ready for immediate delivery and has no immediate presecessor then it is
180 * scheduled for timing out. After being timed out it is ready for delivery.
181 *
182 * @param o an element.
183 */
184 public synchronized void insert(E o) {
185 // wrap object into internal element
186 Element<E> element = new Element<E>(o);
187 // add element to sequence in proper order
188 sequence.add(element);
189
190 Element<E> successor = sequence.successor(element);
191
192 // check if there is an immediate successor and cancel
193 // timer task (no need to wait any more for timeout)
194 if (successor != null) {
195 successor.cancel();
196 }
197
198 // start delivery if current element is successor of last delivered element
199 if (successorOfLastDelivered(element)) {
200 // nothing to schedule
201 } else if (sequence.predecessor(element) != null) {
202 // nothing to schedule
203 } else {
204 element.schedule(defineTimeout());
205 }
206 }
207
208 /**
209 * Delivers all elements which are currently ready to deliver.
210 *
211 * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
212 *
213 * @see ResequencerEngine#deliverNext()
214 */
215 public synchronized void deliver() throws Exception {
216 while (deliverNext()) {
217 // do nothing here
218 }
219 }
220
221 /**
222 * Attempts to deliver a single element from the head of the resequencer
223 * queue (sequence). Only elements which have not been scheduled for timing
224 * out or which already timed out can be delivered. Elements are deliveref via
225 * {@link SequenceSender#sendElement(Object)}.
226 *
227 * @return <code>true</code> if the element has been delivered
228 * <code>false</code> otherwise.
229 *
230 * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
231 *
232 */
233 public boolean deliverNext() throws Exception {
234 if (sequence.size() == 0) {
235 return false;
236 }
237 // inspect element with lowest sequence value
238 Element<E> element = sequence.first();
239
240 // if element is scheduled do not deliver and return
241 if (element.scheduled()) {
242 return false;
243 }
244
245 // remove deliverable element from sequence
246 sequence.remove(element);
247
248 // set the delivered element to last delivered element
249 lastDelivered = element;
250
251 // deliver the sequence element
252 sequenceSender.sendElement(element.getObject());
253
254 // element has been delivered
255 return true;
256 }
257
258 /**
259 * Returns <code>true</code> if the given element is the immediate
260 * successor of the last delivered element.
261 *
262 * @param element an element.
263 * @return <code>true</code> if the given element is the immediate
264 * successor of the last delivered element.
265 */
266 private boolean successorOfLastDelivered(Element<E> element) {
267 if (lastDelivered == null) {
268 return false;
269 }
270 if (sequence.comparator().successor(element, lastDelivered)) {
271 return true;
272 }
273 return false;
274 }
275
276 /**
277 * Creates a timeout task based on the timeout setting of this resequencer.
278 *
279 * @return a new timeout task.
280 */
281 private Timeout defineTimeout() {
282 return new Timeout(timer, timeout);
283 }
284
285 private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
286 return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
287 }
288
289 }