001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.mock;
018
019 import java.beans.PropertyChangeListener;
020 import java.beans.PropertyChangeSupport;
021 import java.util.ArrayList;
022 import java.util.Arrays;
023 import java.util.Collection;
024 import java.util.HashMap;
025 import java.util.HashSet;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029 import java.util.concurrent.CopyOnWriteArrayList;
030 import java.util.concurrent.CountDownLatch;
031 import java.util.concurrent.TimeUnit;
032
033 import org.apache.camel.CamelContext;
034 import org.apache.camel.Component;
035 import org.apache.camel.Consumer;
036 import org.apache.camel.Endpoint;
037 import org.apache.camel.Exchange;
038 import org.apache.camel.Expression;
039 import org.apache.camel.Message;
040 import org.apache.camel.Processor;
041 import org.apache.camel.Producer;
042 import org.apache.camel.impl.DefaultEndpoint;
043 import org.apache.camel.impl.DefaultProducer;
044 import org.apache.camel.spi.BrowsableEndpoint;
045 import org.apache.camel.util.CamelContextHelper;
046 import org.apache.camel.util.ExpressionComparator;
047 import org.apache.camel.util.ObjectHelper;
048 import org.apache.commons.logging.Log;
049 import org.apache.commons.logging.LogFactory;
050
051 /**
052 * A Mock endpoint which provides a literate, fluent API for testing routes
053 * using a <a href="http://jmock.org/">JMock style</a> API.
054 *
055 * @version $Revision: 765501 $
056 */
057 public class MockEndpoint extends DefaultEndpoint<Exchange> implements BrowsableEndpoint<Exchange> {
058 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
059 private int expectedCount;
060 private int counter;
061 private Processor defaultProcessor;
062 private Map<Integer, Processor> processors;
063 private List<Exchange> receivedExchanges;
064 private List<Throwable> failures;
065 private List<Runnable> tests;
066 private CountDownLatch latch;
067 private long sleepForEmptyTest;
068 private long resultWaitTime;
069 private long resultMinimumWaitTime;
070 private int expectedMinimumCount;
071 private List expectedBodyValues;
072 private List actualBodyValues;
073 private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
074 private String headerName;
075 private String headerValue;
076 private Object actualHeader;
077 private Processor reporter;
078
079 public MockEndpoint(String endpointUri, Component component) {
080 super(endpointUri, component);
081 init();
082 }
083
084 public MockEndpoint(String endpointUri) {
085 super(endpointUri);
086 init();
087 }
088
089
090 /**
091 * A helper method to resolve the mock endpoint of the given URI on the given context
092 *
093 * @param context the camel context to try resolve the mock endpoint from
094 * @param uri the uri of the endpoint to resolve
095 * @return the endpoint
096 */
097 public static MockEndpoint resolve(CamelContext context, String uri) {
098 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
099 }
100
101 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
102 long start = System.currentTimeMillis();
103 long left = unit.toMillis(timeout);
104 long end = start + left;
105 for (MockEndpoint endpoint : endpoints) {
106 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
107 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
108 }
109 left = end - System.currentTimeMillis();
110 if (left <= 0) {
111 left = 0;
112 }
113 }
114 }
115
116 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
117 assertWait(timeout, unit, endpoints);
118 for (MockEndpoint endpoint : endpoints) {
119 endpoint.assertIsSatisfied();
120 }
121 }
122
123 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
124 for (MockEndpoint endpoint : endpoints) {
125 endpoint.assertIsSatisfied();
126 }
127 }
128
129
130 /**
131 * Asserts that all the expectations on any {@link MockEndpoint} instances registered
132 * in the given context are valid
133 *
134 * @param context the camel context used to find all the available endpoints to be asserted
135 */
136 public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
137 ObjectHelper.notNull(context, "camelContext");
138 Collection<Endpoint> endpoints = context.getSingletonEndpoints();
139 for (Endpoint endpoint : endpoints) {
140 if (endpoint instanceof MockEndpoint) {
141 MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
142 mockEndpoint.assertIsSatisfied();
143 }
144 }
145 }
146
147
148 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
149 for (MockEndpoint endpoint : endpoints) {
150 MockEndpoint.expectsMessageCount(count);
151 }
152 }
153
154 public List<Exchange> getExchanges() {
155 return getReceivedExchanges();
156 }
157
158 public void addPropertyChangeListener(PropertyChangeListener listener) {
159 propertyChangeSupport.addPropertyChangeListener(listener);
160 }
161
162 public void removePropertyChangeListener(PropertyChangeListener listener) {
163 propertyChangeSupport.removePropertyChangeListener(listener);
164 }
165
166 public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
167 throw new UnsupportedOperationException("You cannot consume from this endpoint");
168 }
169
170 public Producer<Exchange> createProducer() throws Exception {
171 return new DefaultProducer<Exchange>(this) {
172 public void process(Exchange exchange) {
173 onExchange(exchange);
174 }
175 };
176 }
177
178 public void reset() {
179 init();
180 }
181
182
183 // Testing API
184 // -------------------------------------------------------------------------
185
186 /**
187 * Set the processor that will be invoked when the index
188 * message is received.
189 *
190 * @param index
191 * @param processor
192 */
193 public void whenExchangeReceived(int index, Processor processor) {
194 this.processors.put(index, processor);
195 }
196
197 /**
198 * Set the processor that will be invoked when the some message
199 * is received.
200 *
201 * This processor could be overwritten by
202 * {@link #whenExchangeReceived(int, Processor)} method.
203 *
204 * @param processor
205 */
206 public void whenAnyExchangeReceived(Processor processor) {
207 this.defaultProcessor = processor;
208 }
209
210 /**
211 * Validates that all the available expectations on this endpoint are
212 * satisfied; or throw an exception
213 */
214 public void assertIsSatisfied() throws InterruptedException {
215 assertIsSatisfied(sleepForEmptyTest);
216 }
217
218 /**
219 * Validates that all the available expectations on this endpoint are
220 * satisfied; or throw an exception
221 *
222 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
223 * should wait for the test to be true
224 */
225 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
226 LOG.info("Asserting: " + this + " is satisfied");
227 if (expectedCount == 0) {
228 if (timeoutForEmptyEndpoints > 0) {
229 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
230 Thread.sleep(timeoutForEmptyEndpoints);
231 }
232 assertEquals("Received message count", expectedCount, getReceivedCounter());
233 } else if (expectedCount > 0) {
234 if (expectedCount != getReceivedCounter()) {
235 waitForCompleteLatch();
236 }
237 assertEquals("Received message count", expectedCount, getReceivedCounter());
238 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
239 waitForCompleteLatch();
240 }
241
242 if (expectedMinimumCount >= 0) {
243 int receivedCounter = getReceivedCounter();
244 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
245 }
246
247 for (Runnable test : tests) {
248 test.run();
249 }
250
251 for (Throwable failure : failures) {
252 if (failure != null) {
253 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
254 fail("Failed due to caught exception: " + failure);
255 }
256 }
257 }
258
259 /**
260 * Validates that the assertions fail on this endpoint
261 */
262 public void assertIsNotSatisfied() throws InterruptedException {
263 try {
264 assertIsSatisfied();
265 fail("Expected assertion failure!");
266 } catch (AssertionError e) {
267 LOG.info("Caught expected failure: " + e);
268 }
269 }
270
271 /**
272 * Specifies the expected number of message exchanges that should be
273 * received by this endpoint
274 *
275 * @param expectedCount the number of message exchanges that should be
276 * expected by this endpoint
277 */
278 public void expectedMessageCount(int expectedCount) {
279 setExpectedMessageCount(expectedCount);
280 }
281
282 /**
283 * Specifies the minimum number of expected message exchanges that should be
284 * received by this endpoint
285 *
286 * @param expectedCount the number of message exchanges that should be
287 * expected by this endpoint
288 */
289 public void expectedMinimumMessageCount(int expectedCount) {
290 setMinimumExpectedMessageCount(expectedCount);
291 }
292
293 /**
294 * Adds an expectation that the given header name & value are received by this
295 * endpoint
296 */
297 public void expectedHeaderReceived(String name, String value) {
298 this.headerName = name;
299 this.headerValue = value;
300
301 expects(new Runnable() {
302 public void run() {
303 assertTrue("No header with name " + headerName + " found.", actualHeader != null);
304
305 assertEquals("Header of message", headerValue, actualHeader);
306 }
307 });
308 }
309
310 /**
311 * Adds an expectation that the given body values are received by this
312 * endpoint in the specified order
313 */
314 public void expectedBodiesReceived(final List bodies) {
315 expectedMessageCount(bodies.size());
316 this.expectedBodyValues = bodies;
317 this.actualBodyValues = new ArrayList();
318
319 expects(new Runnable() {
320 public void run() {
321 for (int i = 0; i < expectedBodyValues.size(); i++) {
322 Exchange exchange = getReceivedExchanges().get(i);
323 assertTrue("No exchange received for counter: " + i, exchange != null);
324
325 Object expectedBody = expectedBodyValues.get(i);
326 Object actualBody = null;
327 if (i < actualBodyValues.size()) {
328 actualBody = actualBodyValues.get(i);
329 }
330
331 assertEquals("Body of message: " + i, expectedBody, actualBody);
332 }
333 }
334 });
335 }
336
337 /**
338 * Adds an expectation that the given body values are received by this
339 * endpoint
340 */
341 public void expectedBodiesReceived(Object... bodies) {
342 List bodyList = new ArrayList();
343 bodyList.addAll(Arrays.asList(bodies));
344 expectedBodiesReceived(bodyList);
345 }
346
347 /**
348 * Adds an expectation that the given body values are received by this
349 * endpoint in any order
350 */
351 public void expectedBodiesReceivedInAnyOrder(final List bodies) {
352 expectedMessageCount(bodies.size());
353 this.expectedBodyValues = bodies;
354 this.actualBodyValues = new ArrayList();
355
356 expects(new Runnable() {
357 public void run() {
358 Set actualBodyValuesSet = new HashSet(actualBodyValues);
359 for (int i = 0; i < expectedBodyValues.size(); i++) {
360 Exchange exchange = getReceivedExchanges().get(i);
361 assertTrue("No exchange received for counter: " + i, exchange != null);
362
363 Object expectedBody = expectedBodyValues.get(i);
364 assertTrue("Message with body " + expectedBody
365 + " was expected but not found in " + actualBodyValuesSet,
366 actualBodyValuesSet.remove(expectedBody));
367 }
368 }
369 });
370 }
371
372 /**
373 * Adds an expectation that the given body values are received by this
374 * endpoint in any order
375 */
376 public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
377 List bodyList = new ArrayList();
378 bodyList.addAll(Arrays.asList(bodies));
379 expectedBodiesReceivedInAnyOrder(bodyList);
380 }
381
382 /**
383 * Adds an expectation that messages received should have ascending values
384 * of the given expression such as a user generated counter value
385 *
386 * @param expression
387 */
388 public void expectsAscending(final Expression<Exchange> expression) {
389 expects(new Runnable() {
390 public void run() {
391 assertMessagesAscending(expression);
392 }
393 });
394 }
395
396 /**
397 * Adds an expectation that messages received should have descending values
398 * of the given expression such as a user generated counter value
399 *
400 * @param expression
401 */
402 public void expectsDescending(final Expression<Exchange> expression) {
403 expects(new Runnable() {
404 public void run() {
405 assertMessagesDescending(expression);
406 }
407 });
408 }
409
410 /**
411 * Adds an expectation that no duplicate messages should be received using
412 * the expression to determine the message ID
413 *
414 * @param expression the expression used to create a unique message ID for
415 * message comparison (which could just be the message
416 * payload if the payload can be tested for uniqueness using
417 * {@link Object#equals(Object)} and
418 * {@link Object#hashCode()}
419 */
420 public void expectsNoDuplicates(final Expression<Exchange> expression) {
421 expects(new Runnable() {
422 public void run() {
423 assertNoDuplicates(expression);
424 }
425 });
426 }
427
428 /**
429 * Asserts that the messages have ascending values of the given expression
430 */
431 public void assertMessagesAscending(Expression<Exchange> expression) {
432 assertMessagesSorted(expression, true);
433 }
434
435 /**
436 * Asserts that the messages have descending values of the given expression
437 */
438 public void assertMessagesDescending(Expression<Exchange> expression) {
439 assertMessagesSorted(expression, false);
440 }
441
442 protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) {
443 String type = ascending ? "ascending" : "descending";
444 ExpressionComparator comparator = new ExpressionComparator(expression);
445 List<Exchange> list = getReceivedExchanges();
446 for (int i = 1; i < list.size(); i++) {
447 int j = i - 1;
448 Exchange e1 = list.get(j);
449 Exchange e2 = list.get(i);
450 int result = comparator.compare(e1, e2);
451 if (result == 0) {
452 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and "
453 + e2);
454 } else {
455 if (!ascending) {
456 result = result * -1;
457 }
458 if (result > 0) {
459 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: "
460 + expression + ". Exchanges: " + e1 + " and " + e2);
461 }
462 }
463 }
464 }
465
466 public void assertNoDuplicates(Expression<Exchange> expression) {
467 Map<Object, Exchange> map = new HashMap<Object, Exchange>();
468 List<Exchange> list = getReceivedExchanges();
469 for (int i = 0; i < list.size(); i++) {
470 Exchange e2 = list.get(i);
471 Object key = expression.evaluate(e2);
472 Exchange e1 = map.get(key);
473 if (e1 != null) {
474 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
475 } else {
476 map.put(key, e2);
477 }
478 }
479 }
480
481 /**
482 * Adds the expectation which will be invoked when enough messages are
483 * received
484 */
485 public void expects(Runnable runnable) {
486 tests.add(runnable);
487 }
488
489 /**
490 * Adds an assertion to the given message index
491 *
492 * @param messageIndex the number of the message
493 * @return the assertion clause
494 */
495 public AssertionClause message(final int messageIndex) {
496 AssertionClause clause = new AssertionClause() {
497 public void run() {
498 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
499 }
500 };
501 expects(clause);
502 return clause;
503 }
504
505 /**
506 * Adds an assertion to all the received messages
507 *
508 * @return the assertion clause
509 */
510 public AssertionClause allMessages() {
511 AssertionClause clause = new AssertionClause() {
512 public void run() {
513 List<Exchange> list = getReceivedExchanges();
514 int index = 0;
515 for (Exchange exchange : list) {
516 applyAssertionOn(MockEndpoint.this, index++, exchange);
517 }
518 }
519 };
520 expects(clause);
521 return clause;
522 }
523
524 /**
525 * Asserts that the given index of message is received (starting at zero)
526 */
527 public Exchange assertExchangeReceived(int index) {
528 int count = getReceivedCounter();
529 assertTrue("Not enough messages received. Was: " + count, count > index);
530 return getReceivedExchanges().get(index);
531 }
532
533 // Properties
534 // -------------------------------------------------------------------------
535 public List<Throwable> getFailures() {
536 return failures;
537 }
538
539 public int getReceivedCounter() {
540 return getReceivedExchanges().size();
541 }
542
543 public List<Exchange> getReceivedExchanges() {
544 return receivedExchanges;
545 }
546
547 public int getExpectedCount() {
548 return expectedCount;
549 }
550
551 public long getSleepForEmptyTest() {
552 return sleepForEmptyTest;
553 }
554
555 /**
556 * Allows a sleep to be specified to wait to check that this endpoint really
557 * is empty when {@link #expectedMessageCount(int)} is called with zero
558 *
559 * @param sleepForEmptyTest the milliseconds to sleep for to determine that
560 * this endpoint really is empty
561 */
562 public void setSleepForEmptyTest(long sleepForEmptyTest) {
563 this.sleepForEmptyTest = sleepForEmptyTest;
564 }
565
566 public long getResultWaitTime() {
567 return resultWaitTime;
568 }
569
570 /**
571 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
572 * wait on a latch until it is satisfied
573 */
574 public void setResultWaitTime(long resultWaitTime) {
575 this.resultWaitTime = resultWaitTime;
576 }
577
578 /**
579 * Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
580 * wait on a latch until it is satisfied
581 */
582 public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
583 this.resultMinimumWaitTime = resultMinimumWaitTime;
584 }
585
586 /**
587 * Specifies the expected number of message exchanges that should be
588 * received by this endpoint
589 *
590 * @param expectedCount the number of message exchanges that should be
591 * expected by this endpoint
592 */
593 public void setExpectedMessageCount(int expectedCount) {
594 this.expectedCount = expectedCount;
595 if (expectedCount <= 0) {
596 latch = null;
597 } else {
598 latch = new CountDownLatch(expectedCount);
599 }
600 }
601
602 /**
603 * Specifies the minimum number of expected message exchanges that should be
604 * received by this endpoint
605 *
606 * @param expectedCount the number of message exchanges that should be
607 * expected by this endpoint
608 */
609 public void setMinimumExpectedMessageCount(int expectedCount) {
610 this.expectedMinimumCount = expectedCount;
611 if (expectedCount <= 0) {
612 latch = null;
613 } else {
614 latch = new CountDownLatch(expectedMinimumCount);
615 }
616 }
617
618 public Processor getReporter() {
619 return reporter;
620 }
621
622 /**
623 * Allows a processor to added to the endpoint to report on progress of the test
624 */
625 public void setReporter(Processor reporter) {
626 this.reporter = reporter;
627 }
628
629 // Implementation methods
630 // -------------------------------------------------------------------------
631 private void init() {
632 expectedCount = -1;
633 counter = 0;
634 processors = new HashMap<Integer, Processor>();
635 receivedExchanges = new CopyOnWriteArrayList<Exchange>();
636 failures = new CopyOnWriteArrayList<Throwable>();
637 tests = new CopyOnWriteArrayList<Runnable>();
638 latch = null;
639 sleepForEmptyTest = 0;
640 resultWaitTime = 20000L;
641 resultMinimumWaitTime = 0L;
642 expectedMinimumCount = -1;
643 expectedBodyValues = null;
644 actualBodyValues = new ArrayList();
645 }
646
647 protected synchronized void onExchange(Exchange exchange) {
648 try {
649 if (reporter != null) {
650 reporter.process(exchange);
651 }
652 performAssertions(exchange);
653 } catch (Throwable e) {
654 failures.add(e);
655 } finally {
656 // make sure latch is counted down to avoid test hanging forever
657 if (latch != null) {
658 latch.countDown();
659 }
660 }
661 }
662
663 protected void performAssertions(Exchange exchange) throws Exception {
664 Message in = exchange.getIn();
665 Object actualBody = in.getBody();
666
667 if (headerName != null) {
668 actualHeader = in.getHeader(headerName);
669 }
670
671 if (expectedBodyValues != null) {
672 int index = actualBodyValues.size();
673 if (expectedBodyValues.size() > index) {
674 Object expectedBody = expectedBodyValues.get(index);
675 if (expectedBody != null) {
676 actualBody = in.getBody(expectedBody.getClass());
677 }
678 actualBodyValues.add(actualBody);
679 }
680 }
681
682 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody);
683
684 receivedExchanges.add(exchange);
685
686 Processor processor = processors.get(getReceivedCounter()) != null
687 ? processors.get(getReceivedCounter()) : defaultProcessor;
688
689 if (processor != null) {
690 processor.process(exchange);
691 }
692 }
693
694 protected void waitForCompleteLatch() throws InterruptedException {
695 if (latch == null) {
696 fail("Should have a latch!");
697 }
698
699 // now lets wait for the results
700 LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
701 long start = System.currentTimeMillis();
702 latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
703 long delta = System.currentTimeMillis() - start;
704 LOG.debug("Took " + delta + " millis to complete latch");
705
706 if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
707 fail("Expected minimum " + resultWaitTime
708 + " millis waiting on the result, but was faster with " + delta + " millis.");
709 }
710 }
711
712 protected void assertEquals(String message, Object expectedValue, Object actualValue) {
713 if (!ObjectHelper.equal(expectedValue, actualValue)) {
714 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
715 }
716 }
717
718 protected void assertTrue(String message, boolean predicate) {
719 if (!predicate) {
720 fail(message);
721 }
722 }
723
724 protected void fail(Object message) {
725 if (LOG.isDebugEnabled()) {
726 List<Exchange> list = getReceivedExchanges();
727 int index = 0;
728 for (Exchange exchange : list) {
729 LOG.debug("Received[" + (++index) + "]: " + exchange);
730 }
731 }
732 throw new AssertionError(getEndpointUri() + " " + message);
733 }
734
735 public int getExpectedMinimumCount() {
736 return expectedMinimumCount;
737 }
738
739 public void await() throws InterruptedException {
740 if (latch != null) {
741 latch.await();
742 }
743 }
744
745 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
746 if (latch != null) {
747 return latch.await(timeout, unit);
748 }
749 return true;
750 }
751
752 public boolean isSingleton() {
753 return true;
754 }
755 }