001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.mock;
018
019 import java.beans.PropertyChangeListener;
020 import java.beans.PropertyChangeSupport;
021 import java.io.File;
022 import java.util.ArrayList;
023 import java.util.Arrays;
024 import java.util.Collection;
025 import java.util.HashMap;
026 import java.util.HashSet;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.concurrent.CopyOnWriteArrayList;
031 import java.util.concurrent.CountDownLatch;
032 import java.util.concurrent.TimeUnit;
033
034 import org.apache.camel.CamelContext;
035 import org.apache.camel.Component;
036 import org.apache.camel.Consumer;
037 import org.apache.camel.Endpoint;
038 import org.apache.camel.Exchange;
039 import org.apache.camel.Expression;
040 import org.apache.camel.Message;
041 import org.apache.camel.Processor;
042 import org.apache.camel.Producer;
043 import org.apache.camel.builder.ExpressionClause;
044 import org.apache.camel.impl.DefaultEndpoint;
045 import org.apache.camel.impl.DefaultProducer;
046 import org.apache.camel.spi.BrowsableEndpoint;
047 import org.apache.camel.util.CamelContextHelper;
048 import org.apache.camel.util.ExpressionComparator;
049 import org.apache.camel.util.FileUtil;
050 import org.apache.camel.util.ObjectHelper;
051 import org.apache.commons.logging.Log;
052 import org.apache.commons.logging.LogFactory;
053
054 /**
055 * A Mock endpoint which provides a literate, fluent API for testing routes
056 * using a <a href="http://jmock.org/">JMock style</a> API.
057 *
058 * @version $Revision: 794648 $
059 */
060 public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
061 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
062 private int expectedCount;
063 private int counter;
064 private Processor defaultProcessor;
065 private Map<Integer, Processor> processors;
066 private List<Exchange> receivedExchanges;
067 private List<Throwable> failures;
068 private List<Runnable> tests;
069 private CountDownLatch latch;
070 private long sleepForEmptyTest;
071 private long resultWaitTime;
072 private long resultMinimumWaitTime;
073 private int expectedMinimumCount;
074 private List expectedBodyValues;
075 private List actualBodyValues;
076 private final PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
077 private String headerName;
078 private Object headerValue;
079 private Object actualHeader;
080 private String propertyName;
081 private Object propertyValue;
082 private Object actualProperty;
083 private Processor reporter;
084
085 public MockEndpoint(String endpointUri, Component component) {
086 super(endpointUri, component);
087 init();
088 }
089
090 public MockEndpoint(String endpointUri) {
091 super(endpointUri);
092 init();
093 }
094
095 public MockEndpoint() {
096 this(null);
097 }
098
099 /**
100 * A helper method to resolve the mock endpoint of the given URI on the given context
101 *
102 * @param context the camel context to try resolve the mock endpoint from
103 * @param uri the uri of the endpoint to resolve
104 * @return the endpoint
105 */
106 public static MockEndpoint resolve(CamelContext context, String uri) {
107 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
108 }
109
110 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
111 long start = System.currentTimeMillis();
112 long left = unit.toMillis(timeout);
113 long end = start + left;
114 for (MockEndpoint endpoint : endpoints) {
115 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
116 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
117 }
118 left = end - System.currentTimeMillis();
119 if (left <= 0) {
120 left = 0;
121 }
122 }
123 }
124
125 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
126 assertWait(timeout, unit, endpoints);
127 for (MockEndpoint endpoint : endpoints) {
128 endpoint.assertIsSatisfied();
129 }
130 }
131
132 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
133 for (MockEndpoint endpoint : endpoints) {
134 endpoint.assertIsSatisfied();
135 }
136 }
137
138
139 /**
140 * Asserts that all the expectations on any {@link MockEndpoint} instances registered
141 * in the given context are valid
142 *
143 * @param context the camel context used to find all the available endpoints to be asserted
144 */
145 public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
146 ObjectHelper.notNull(context, "camelContext");
147 Collection<Endpoint> endpoints = context.getSingletonEndpoints();
148 for (Endpoint endpoint : endpoints) {
149 if (endpoint instanceof MockEndpoint) {
150 MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
151 mockEndpoint.assertIsSatisfied();
152 }
153 }
154 }
155
156 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
157 for (MockEndpoint endpoint : endpoints) {
158 endpoint.setExpectedMessageCount(count);
159 }
160 }
161
162 public List<Exchange> getExchanges() {
163 return getReceivedExchanges();
164 }
165
166 public void addPropertyChangeListener(PropertyChangeListener listener) {
167 propertyChangeSupport.addPropertyChangeListener(listener);
168 }
169
170 public void removePropertyChangeListener(PropertyChangeListener listener) {
171 propertyChangeSupport.removePropertyChangeListener(listener);
172 }
173
174 public Consumer createConsumer(Processor processor) throws Exception {
175 throw new UnsupportedOperationException("You cannot consume from this endpoint");
176 }
177
178 public Producer createProducer() throws Exception {
179 return new DefaultProducer(this) {
180 public void process(Exchange exchange) {
181 onExchange(exchange);
182 }
183 };
184 }
185
186 public void reset() {
187 init();
188 }
189
190
191 // Testing API
192 // -------------------------------------------------------------------------
193
194 /**
195 * Set the processor that will be invoked when the index
196 * message is received.
197 */
198 public void whenExchangeReceived(int index, Processor processor) {
199 this.processors.put(index, processor);
200 }
201
202 /**
203 * Set the processor that will be invoked when the some message
204 * is received.
205 *
206 * This processor could be overwritten by
207 * {@link #whenExchangeReceived(int, Processor)} method.
208 */
209 public void whenAnyExchangeReceived(Processor processor) {
210 this.defaultProcessor = processor;
211 }
212
213 /**
214 * Validates that all the available expectations on this endpoint are
215 * satisfied; or throw an exception
216 */
217 public void assertIsSatisfied() throws InterruptedException {
218 assertIsSatisfied(sleepForEmptyTest);
219 }
220
221 /**
222 * Validates that all the available expectations on this endpoint are
223 * satisfied; or throw an exception
224 *
225 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
226 * should wait for the test to be true
227 */
228 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
229 LOG.info("Asserting: " + this + " is satisfied");
230 if (expectedCount == 0) {
231 if (timeoutForEmptyEndpoints > 0) {
232 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
233 Thread.sleep(timeoutForEmptyEndpoints);
234 }
235 assertEquals("Received message count", expectedCount, getReceivedCounter());
236 } else if (expectedCount > 0) {
237 if (expectedCount != getReceivedCounter()) {
238 waitForCompleteLatch();
239 }
240 assertEquals("Received message count", expectedCount, getReceivedCounter());
241 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
242 waitForCompleteLatch();
243 }
244
245 if (expectedMinimumCount >= 0) {
246 int receivedCounter = getReceivedCounter();
247 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
248 }
249
250 for (Runnable test : tests) {
251 test.run();
252 }
253
254 for (Throwable failure : failures) {
255 if (failure != null) {
256 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
257 fail("Failed due to caught exception: " + failure);
258 }
259 }
260 }
261
262 /**
263 * Validates that the assertions fail on this endpoint
264 */
265 public void assertIsNotSatisfied() throws InterruptedException {
266 try {
267 assertIsSatisfied();
268 fail("Expected assertion failure!");
269 } catch (AssertionError e) {
270 LOG.info("Caught expected failure: " + e);
271 }
272 }
273
274 /**
275 * Validates that the assertions fail on this endpoint
276
277 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
278 * should wait for the test to be true
279 */
280 public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
281 try {
282 assertIsSatisfied(timeoutForEmptyEndpoints);
283 fail("Expected assertion failure!");
284 } catch (AssertionError e) {
285 LOG.info("Caught expected failure: " + e);
286 }
287 }
288
289 /**
290 * Specifies the expected number of message exchanges that should be
291 * received by this endpoint
292 *
293 * @param expectedCount the number of message exchanges that should be
294 * expected by this endpoint
295 */
296 public void expectedMessageCount(int expectedCount) {
297 setExpectedMessageCount(expectedCount);
298 }
299
300 /**
301 * Specifies the minimum number of expected message exchanges that should be
302 * received by this endpoint
303 *
304 * @param expectedCount the number of message exchanges that should be
305 * expected by this endpoint
306 */
307 public void expectedMinimumMessageCount(int expectedCount) {
308 setMinimumExpectedMessageCount(expectedCount);
309 }
310
311 /**
312 * Adds an expectation that the given header name & value are received by this endpoint
313 */
314 public void expectedHeaderReceived(final String name, final Object value) {
315 this.headerName = name;
316 this.headerValue = value;
317
318 expects(new Runnable() {
319 public void run() {
320 assertTrue("No header with name " + headerName + " found.", actualHeader != null);
321
322 Object actualValue = getCamelContext().getTypeConverter().convertTo(actualHeader.getClass(), headerValue);
323 assertEquals("Header of message", actualValue, actualHeader);
324 }
325 });
326 }
327
328 /**
329 * Adds an expectation that the given property name & value are received by this endpoint
330 */
331 public void expectedPropertyReceived(final String name, final Object value) {
332 this.propertyName = name;
333 this.propertyValue = value;
334
335 expects(new Runnable() {
336 public void run() {
337 assertTrue("No property with name " + propertyName + " found.", actualProperty != null);
338
339 Object actualValue = getCamelContext().getTypeConverter().convertTo(actualProperty.getClass(), propertyValue);
340 assertEquals("Property of message", actualValue, actualProperty);
341 }
342 });
343 }
344
345 /**
346 * Adds an expectation that the given body values are received by this
347 * endpoint in the specified order
348 */
349 public void expectedBodiesReceived(final List bodies) {
350 expectedMessageCount(bodies.size());
351 this.expectedBodyValues = bodies;
352 this.actualBodyValues = new ArrayList();
353
354 expects(new Runnable() {
355 public void run() {
356 for (int i = 0; i < expectedBodyValues.size(); i++) {
357 Exchange exchange = getReceivedExchanges().get(i);
358 assertTrue("No exchange received for counter: " + i, exchange != null);
359
360 Object expectedBody = expectedBodyValues.get(i);
361 Object actualBody = null;
362 if (i < actualBodyValues.size()) {
363 actualBody = actualBodyValues.get(i);
364 }
365
366 assertEquals("Body of message: " + i, expectedBody, actualBody);
367 }
368 }
369 });
370 }
371
372 /**
373 * Adds an expectation that the given body values are received by this endpoint
374 */
375 public void expectedBodiesReceived(Object... bodies) {
376 List bodyList = new ArrayList();
377 bodyList.addAll(Arrays.asList(bodies));
378 expectedBodiesReceived(bodyList);
379 }
380
381 /**
382 * Adds an expectation that the given body value are received by this endpoint
383 */
384 public ExpressionClause expectedBodyReceived() {
385 final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this);
386
387 expectedMessageCount(1);
388
389 expects(new Runnable() {
390 public void run() {
391 Exchange exchange = getReceivedExchanges().get(0);
392 assertTrue("No exchange received for counter: " + 0, exchange != null);
393
394 Object actualBody = exchange.getIn().getBody();
395 Object expectedBody = clause.evaluate(exchange, Object.class);
396
397 assertEquals("Body of message: " + 0, expectedBody, actualBody);
398 }
399 });
400
401 return clause;
402 }
403
404 /**
405 * Adds an expectation that the given body values are received by this
406 * endpoint in any order
407 */
408 public void expectedBodiesReceivedInAnyOrder(final List bodies) {
409 expectedMessageCount(bodies.size());
410 this.expectedBodyValues = bodies;
411 this.actualBodyValues = new ArrayList();
412
413 expects(new Runnable() {
414 public void run() {
415 Set actualBodyValuesSet = new HashSet(actualBodyValues);
416 for (int i = 0; i < expectedBodyValues.size(); i++) {
417 Exchange exchange = getReceivedExchanges().get(i);
418 assertTrue("No exchange received for counter: " + i, exchange != null);
419
420 Object expectedBody = expectedBodyValues.get(i);
421 assertTrue("Message with body " + expectedBody
422 + " was expected but not found in " + actualBodyValuesSet,
423 actualBodyValuesSet.remove(expectedBody));
424 }
425 }
426 });
427 }
428
429 /**
430 * Adds an expectation that the given body values are received by this
431 * endpoint in any order
432 */
433 @SuppressWarnings("unchecked")
434 public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
435 List bodyList = new ArrayList();
436 bodyList.addAll(Arrays.asList(bodies));
437 expectedBodiesReceivedInAnyOrder(bodyList);
438 }
439
440 /**
441 * Adds an expection that a file exists with the given name
442 *
443 * @param name name of file, will cater for / and \ on different OS platforms
444 */
445 public void expectedFileExists(final String name) {
446 expectedFileExists(name, null);
447 }
448
449 /**
450 * Adds an expection that a file exists with the given name
451 * <p/>
452 * Will wait at most 5 seconds while checking for the existence of the file.
453 *
454 * @param name name of file, will cater for / and \ on different OS platforms
455 * @param content content of file to compare, can be <tt>null</tt> to not compare content
456 */
457 public void expectedFileExists(final String name, final String content) {
458 final File file = new File(FileUtil.normalizePath(name)).getAbsoluteFile();
459
460 expects(new Runnable() {
461 public void run() {
462 // wait at most 5 seconds for the file to exists
463 final long timeout = System.currentTimeMillis() + 5000;
464
465 boolean stop = false;
466 while (!stop && !file.exists()) {
467 try {
468 Thread.sleep(50);
469 } catch (InterruptedException e) {
470 // ignore
471 }
472 stop = System.currentTimeMillis() > timeout;
473 }
474
475 assertTrue("The file should exists: " + name, file.exists());
476
477 if (content != null) {
478 String body = getCamelContext().getTypeConverter().convertTo(String.class, file);
479 assertEquals("Content of file: " + name, content, body);
480 }
481 }
482 });
483 }
484
485 /**
486 * Adds an expectation that messages received should have ascending values
487 * of the given expression such as a user generated counter value
488 */
489 public void expectsAscending(final Expression expression) {
490 expects(new Runnable() {
491 public void run() {
492 assertMessagesAscending(expression);
493 }
494 });
495 }
496
497 /**
498 * Adds an expectation that messages received should have ascending values
499 * of the given expression such as a user generated counter value
500 */
501 public ExpressionClause expectsAscending() {
502 final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this);
503 expects(new Runnable() {
504 public void run() {
505 assertMessagesAscending(clause.getExpressionValue());
506 }
507 });
508 return clause;
509 }
510
511 /**
512 * Adds an expectation that messages received should have descending values
513 * of the given expression such as a user generated counter value
514 */
515 public void expectsDescending(final Expression expression) {
516 expects(new Runnable() {
517 public void run() {
518 assertMessagesDescending(expression);
519 }
520 });
521 }
522
523 /**
524 * Adds an expectation that messages received should have descending values
525 * of the given expression such as a user generated counter value
526 */
527 public ExpressionClause expectsDescending() {
528 final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this);
529 expects(new Runnable() {
530 public void run() {
531 assertMessagesDescending(clause.getExpressionValue());
532 }
533 });
534 return clause;
535 }
536
537 /**
538 * Adds an expectation that no duplicate messages should be received using
539 * the expression to determine the message ID
540 *
541 * @param expression the expression used to create a unique message ID for
542 * message comparison (which could just be the message
543 * payload if the payload can be tested for uniqueness using
544 * {@link Object#equals(Object)} and
545 * {@link Object#hashCode()}
546 */
547 public void expectsNoDuplicates(final Expression expression) {
548 expects(new Runnable() {
549 public void run() {
550 assertNoDuplicates(expression);
551 }
552 });
553 }
554
555 /**
556 * Adds an expectation that no duplicate messages should be received using
557 * the expression to determine the message ID
558 */
559 public ExpressionClause expectsNoDuplicates() {
560 final ExpressionClause clause = new ExpressionClause<MockEndpoint>(this);
561 expects(new Runnable() {
562 public void run() {
563 assertNoDuplicates(clause.getExpressionValue());
564 }
565 });
566 return clause;
567 }
568
569 /**
570 * Asserts that the messages have ascending values of the given expression
571 */
572 public void assertMessagesAscending(Expression expression) {
573 assertMessagesSorted(expression, true);
574 }
575
576 /**
577 * Asserts that the messages have descending values of the given expression
578 */
579 public void assertMessagesDescending(Expression expression) {
580 assertMessagesSorted(expression, false);
581 }
582
583 protected void assertMessagesSorted(Expression expression, boolean ascending) {
584 String type = ascending ? "ascending" : "descending";
585 ExpressionComparator comparator = new ExpressionComparator(expression);
586 List<Exchange> list = getReceivedExchanges();
587 for (int i = 1; i < list.size(); i++) {
588 int j = i - 1;
589 Exchange e1 = list.get(j);
590 Exchange e2 = list.get(i);
591 int result = comparator.compare(e1, e2);
592 if (result == 0) {
593 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: "
594 + expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
595 } else {
596 if (!ascending) {
597 result = result * -1;
598 }
599 if (result > 0) {
600 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class)
601 + " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: "
602 + expression + ". Exchanges: " + e1 + " and " + e2);
603 }
604 }
605 }
606 }
607
608 public void assertNoDuplicates(Expression expression) {
609 Map<Object, Exchange> map = new HashMap<Object, Exchange>();
610 List<Exchange> list = getReceivedExchanges();
611 for (int i = 0; i < list.size(); i++) {
612 Exchange e2 = list.get(i);
613 Object key = expression.evaluate(e2, Object.class);
614 Exchange e1 = map.get(key);
615 if (e1 != null) {
616 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
617 } else {
618 map.put(key, e2);
619 }
620 }
621 }
622
623 /**
624 * Adds the expectation which will be invoked when enough messages are received
625 */
626 public void expects(Runnable runnable) {
627 tests.add(runnable);
628 }
629
630 /**
631 * Adds an assertion to the given message index
632 *
633 * @param messageIndex the number of the message
634 * @return the assertion clause
635 */
636 public AssertionClause message(final int messageIndex) {
637 AssertionClause clause = new AssertionClause() {
638 public void run() {
639 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
640 }
641 };
642 expects(clause);
643 return clause;
644 }
645
646 /**
647 * Adds an assertion to all the received messages
648 *
649 * @return the assertion clause
650 */
651 public AssertionClause allMessages() {
652 AssertionClause clause = new AssertionClause() {
653 public void run() {
654 List<Exchange> list = getReceivedExchanges();
655 int index = 0;
656 for (Exchange exchange : list) {
657 applyAssertionOn(MockEndpoint.this, index++, exchange);
658 }
659 }
660 };
661 expects(clause);
662 return clause;
663 }
664
665 /**
666 * Asserts that the given index of message is received (starting at zero)
667 */
668 public Exchange assertExchangeReceived(int index) {
669 int count = getReceivedCounter();
670 assertTrue("Not enough messages received. Was: " + count, count > index);
671 return getReceivedExchanges().get(index);
672 }
673
674 // Properties
675 // -------------------------------------------------------------------------
676 public List<Throwable> getFailures() {
677 return failures;
678 }
679
680 public int getReceivedCounter() {
681 return receivedExchanges.size();
682 }
683
684 public List<Exchange> getReceivedExchanges() {
685 return receivedExchanges;
686 }
687
688 public int getExpectedCount() {
689 return expectedCount;
690 }
691
692 public long getSleepForEmptyTest() {
693 return sleepForEmptyTest;
694 }
695
696 /**
697 * Allows a sleep to be specified to wait to check that this endpoint really
698 * is empty when {@link #expectedMessageCount(int)} is called with zero
699 *
700 * @param sleepForEmptyTest the milliseconds to sleep for to determine that
701 * this endpoint really is empty
702 */
703 public void setSleepForEmptyTest(long sleepForEmptyTest) {
704 this.sleepForEmptyTest = sleepForEmptyTest;
705 }
706
707 public long getResultWaitTime() {
708 return resultWaitTime;
709 }
710
711 /**
712 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
713 * wait on a latch until it is satisfied
714 */
715 public void setResultWaitTime(long resultWaitTime) {
716 this.resultWaitTime = resultWaitTime;
717 }
718
719 /**
720 * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
721 * wait on a latch until it is satisfied
722 */
723 public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
724 this.resultMinimumWaitTime = resultMinimumWaitTime;
725 }
726
727 /**
728 * Specifies the expected number of message exchanges that should be
729 * received by this endpoint
730 *
731 * @param expectedCount the number of message exchanges that should be
732 * expected by this endpoint
733 */
734 public void setExpectedMessageCount(int expectedCount) {
735 this.expectedCount = expectedCount;
736 if (expectedCount <= 0) {
737 latch = null;
738 } else {
739 latch = new CountDownLatch(expectedCount);
740 }
741 }
742
743 /**
744 * Specifies the minimum number of expected message exchanges that should be
745 * received by this endpoint
746 *
747 * @param expectedCount the number of message exchanges that should be
748 * expected by this endpoint
749 */
750 public void setMinimumExpectedMessageCount(int expectedCount) {
751 this.expectedMinimumCount = expectedCount;
752 if (expectedCount <= 0) {
753 latch = null;
754 } else {
755 latch = new CountDownLatch(expectedMinimumCount);
756 }
757 }
758
759 public Processor getReporter() {
760 return reporter;
761 }
762
763 /**
764 * Allows a processor to added to the endpoint to report on progress of the test
765 */
766 public void setReporter(Processor reporter) {
767 this.reporter = reporter;
768 }
769
770 // Implementation methods
771 // -------------------------------------------------------------------------
772 private void init() {
773 expectedCount = -1;
774 counter = 0;
775 processors = new HashMap<Integer, Processor>();
776 receivedExchanges = new CopyOnWriteArrayList<Exchange>();
777 failures = new CopyOnWriteArrayList<Throwable>();
778 tests = new CopyOnWriteArrayList<Runnable>();
779 latch = null;
780 sleepForEmptyTest = 0;
781 resultWaitTime = 20000L;
782 resultMinimumWaitTime = 0L;
783 expectedMinimumCount = -1;
784 expectedBodyValues = null;
785 actualBodyValues = new ArrayList();
786 }
787
788 protected synchronized void onExchange(Exchange exchange) {
789 try {
790 if (reporter != null) {
791 reporter.process(exchange);
792 }
793 performAssertions(exchange);
794 } catch (Throwable e) {
795 // must catch throwable as AssertionException extends java.lang.Error
796 failures.add(e);
797 } finally {
798 // make sure latch is counted down to avoid test hanging forever
799 if (latch != null) {
800 latch.countDown();
801 }
802 }
803 }
804
805 @SuppressWarnings("unchecked")
806 protected void performAssertions(Exchange exchange) throws Exception {
807 Message in = exchange.getIn();
808 Object actualBody = in.getBody();
809
810 if (headerName != null) {
811 actualHeader = in.getHeader(headerName);
812 }
813
814 if (propertyName != null) {
815 actualProperty = exchange.getProperty(propertyName);
816 }
817
818 if (expectedBodyValues != null) {
819 int index = actualBodyValues.size();
820 if (expectedBodyValues.size() > index) {
821 Object expectedBody = expectedBodyValues.get(index);
822 if (expectedBody != null) {
823 actualBody = in.getBody(expectedBody.getClass());
824 }
825 actualBodyValues.add(actualBody);
826 }
827 }
828
829 ++counter;
830 if (LOG.isDebugEnabled()) {
831 LOG.debug(getEndpointUri() + " >>>> " + counter + " : " + exchange + " with body: " + actualBody);
832 }
833
834 receivedExchanges.add(exchange);
835
836 Processor processor = processors.get(getReceivedCounter()) != null
837 ? processors.get(getReceivedCounter()) : defaultProcessor;
838
839 if (processor != null) {
840 processor.process(exchange);
841 }
842 }
843
844 protected void waitForCompleteLatch() throws InterruptedException {
845 if (latch == null) {
846 fail("Should have a latch!");
847 }
848
849 // now lets wait for the results
850 LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
851 long start = System.currentTimeMillis();
852 latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
853 long delta = System.currentTimeMillis() - start;
854 LOG.debug("Took " + delta + " millis to complete latch");
855
856 if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
857 fail("Expected minimum " + resultMinimumWaitTime
858 + " millis waiting on the result, but was faster with " + delta + " millis.");
859 }
860 }
861
862 protected void assertEquals(String message, Object expectedValue, Object actualValue) {
863 if (!ObjectHelper.equal(expectedValue, actualValue)) {
864 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
865 }
866 }
867
868 protected void assertTrue(String message, boolean predicate) {
869 if (!predicate) {
870 fail(message);
871 }
872 }
873
874 protected void fail(Object message) {
875 if (LOG.isDebugEnabled()) {
876 List<Exchange> list = getReceivedExchanges();
877 int index = 0;
878 for (Exchange exchange : list) {
879 LOG.debug("Received[" + (++index) + "]: " + exchange);
880 }
881 }
882 throw new AssertionError(getEndpointUri() + " " + message);
883 }
884
885 public int getExpectedMinimumCount() {
886 return expectedMinimumCount;
887 }
888
889 public void await() throws InterruptedException {
890 if (latch != null) {
891 latch.await();
892 }
893 }
894
895 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
896 if (latch != null) {
897 return latch.await(timeout, unit);
898 }
899 return true;
900 }
901
902 public boolean isSingleton() {
903 return true;
904 }
905 }