001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.mock;
018
019 import java.beans.PropertyChangeListener;
020 import java.beans.PropertyChangeSupport;
021 import java.util.ArrayList;
022 import java.util.Arrays;
023 import java.util.Collection;
024 import java.util.HashMap;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.concurrent.CopyOnWriteArrayList;
028 import java.util.concurrent.CountDownLatch;
029 import java.util.concurrent.TimeUnit;
030
031 import org.apache.camel.CamelContext;
032 import org.apache.camel.Component;
033 import org.apache.camel.Consumer;
034 import org.apache.camel.Endpoint;
035 import org.apache.camel.Exchange;
036 import org.apache.camel.Expression;
037 import org.apache.camel.Message;
038 import org.apache.camel.Processor;
039 import org.apache.camel.Producer;
040 import org.apache.camel.impl.DefaultEndpoint;
041 import org.apache.camel.impl.DefaultProducer;
042 import org.apache.camel.spi.BrowsableEndpoint;
043 import org.apache.camel.util.CamelContextHelper;
044 import org.apache.camel.util.ExpressionComparator;
045 import org.apache.camel.util.ObjectHelper;
046 import org.apache.commons.logging.Log;
047 import org.apache.commons.logging.LogFactory;
048
049 /**
050 * A Mock endpoint which provides a literate, fluent API for testing routes
051 * using a <a href="http://jmock.org/">JMock style</a> API.
052 *
053 * @version $Revision: 674383 $
054 */
055 public class MockEndpoint extends DefaultEndpoint<Exchange> implements BrowsableEndpoint<Exchange> {
056 private static final transient Log LOG = LogFactory.getLog(MockEndpoint.class);
057 private int expectedCount;
058 private int counter;
059 private Processor defaultProcessor;
060 private Map<Integer, Processor> processors;
061 private List<Exchange> receivedExchanges;
062 private List<Throwable> failures;
063 private List<Runnable> tests;
064 private CountDownLatch latch;
065 private long sleepForEmptyTest;
066 private long resultWaitTime;
067 private int expectedMinimumCount;
068 private List expectedBodyValues;
069 private List actualBodyValues;
070 private PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
071 private String headerName;
072 private String headerValue;
073 private Object actualHeader;
074 private Processor reporter;
075
076 public MockEndpoint(String endpointUri, Component component) {
077 super(endpointUri, component);
078 init();
079 }
080
081 public MockEndpoint(String endpointUri) {
082 super(endpointUri);
083 init();
084 }
085
086
087 /**
088 * A helper method to resolve the mock endpoint of the given URI on the given context
089 *
090 * @param context the camel context to try resolve the mock endpoint from
091 * @param uri the uri of the endpoint to resolve
092 * @return the endpoint
093 */
094 public static MockEndpoint resolve(CamelContext context, String uri) {
095 return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
096 }
097
098 public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
099 long start = System.currentTimeMillis();
100 long left = unit.toMillis(timeout);
101 long end = start + left;
102 for (MockEndpoint endpoint : endpoints) {
103 if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
104 throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
105 }
106 left = end - System.currentTimeMillis();
107 if (left <= 0) {
108 left = 0;
109 }
110 }
111 }
112
113 public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
114 assertWait(timeout, unit, endpoints);
115 for (MockEndpoint endpoint : endpoints) {
116 endpoint.assertIsSatisfied();
117 }
118 }
119
120 public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
121 for (MockEndpoint endpoint : endpoints) {
122 endpoint.assertIsSatisfied();
123 }
124 }
125
126
127 /**
128 * Asserts that all the expectations on any {@link MockEndpoint} instances registered
129 * in the given context are valid
130 *
131 * @param context the camel context used to find all the available endpoints to be asserted
132 */
133 public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
134 ObjectHelper.notNull(context, "camelContext");
135 Collection<Endpoint> endpoints = context.getSingletonEndpoints();
136 for (Endpoint endpoint : endpoints) {
137 if (endpoint instanceof MockEndpoint) {
138 MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
139 mockEndpoint.assertIsSatisfied();
140 }
141 }
142 }
143
144
145 public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
146 for (MockEndpoint endpoint : endpoints) {
147 MockEndpoint.expectsMessageCount(count);
148 }
149 }
150
151 public List<Exchange> getExchanges() {
152 return getReceivedExchanges();
153 }
154
155 public void addPropertyChangeListener(PropertyChangeListener listener) {
156 propertyChangeSupport.addPropertyChangeListener(listener);
157 }
158
159 public void removePropertyChangeListener(PropertyChangeListener listener) {
160 propertyChangeSupport.removePropertyChangeListener(listener);
161 }
162
163 public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
164 throw new UnsupportedOperationException("You cannot consume from this endpoint");
165 }
166
167 public Producer<Exchange> createProducer() throws Exception {
168 return new DefaultProducer<Exchange>(this) {
169 public void process(Exchange exchange) {
170 onExchange(exchange);
171 }
172 };
173 }
174
175 public void reset() {
176 init();
177 }
178
179
180 // Testing API
181 // -------------------------------------------------------------------------
182
183 /**
184 * Set the processor that will be invoked when the index
185 * message is received.
186 *
187 * @param index
188 * @param processor
189 */
190 public void whenExchangeReceived(int index, Processor processor) {
191 this.processors.put(index, processor);
192 }
193
194 /**
195 * Set the processor that will be invoked when the some message
196 * is received.
197 *
198 * This processor could be overwritten by
199 * {@link #whenExchangeReceived(int, Processor)} method.
200 *
201 * @param processor
202 */
203 public void whenAnyExchangeReceived(Processor processor) {
204 this.defaultProcessor = processor;
205 }
206
207 /**
208 * Validates that all the available expectations on this endpoint are
209 * satisfied; or throw an exception
210 */
211 public void assertIsSatisfied() throws InterruptedException {
212 assertIsSatisfied(sleepForEmptyTest);
213 }
214
215 /**
216 * Validates that all the available expectations on this endpoint are
217 * satisfied; or throw an exception
218 *
219 * @param timeoutForEmptyEndpoints the timeout in milliseconds that we
220 * should wait for the test to be true
221 */
222 public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
223 LOG.info("Asserting: " + this + " is satisfied");
224 if (expectedCount >= 0) {
225 if (expectedCount != getReceivedCounter()) {
226 if (expectedCount == 0) {
227 // lets wait a little bit just in case
228 if (timeoutForEmptyEndpoints > 0) {
229 LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
230 Thread.sleep(timeoutForEmptyEndpoints);
231 }
232 } else {
233 waitForCompleteLatch();
234 }
235 }
236 assertEquals("Received message count", expectedCount, getReceivedCounter());
237 } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
238 waitForCompleteLatch();
239 }
240
241 if (expectedMinimumCount >= 0) {
242 int receivedCounter = getReceivedCounter();
243 assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedCount, expectedCount <= receivedCounter);
244 }
245
246 for (Runnable test : tests) {
247 test.run();
248 }
249
250 for (Throwable failure : failures) {
251 if (failure != null) {
252 LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
253 fail("Failed due to caught exception: " + failure);
254 }
255 }
256 }
257
258 /**
259 * Validates that the assertions fail on this endpoint
260 */
261 public void assertIsNotSatisfied() throws InterruptedException {
262 try {
263 assertIsSatisfied();
264 fail("Expected assertion failure!");
265 } catch (AssertionError e) {
266 LOG.info("Caught expected failure: " + e);
267 }
268 }
269
270 /**
271 * Specifies the expected number of message exchanges that should be
272 * received by this endpoint
273 *
274 * @param expectedCount the number of message exchanges that should be
275 * expected by this endpoint
276 */
277 public void expectedMessageCount(int expectedCount) {
278 setExpectedMessageCount(expectedCount);
279 }
280
281 /**
282 * Specifies the minimum number of expected message exchanges that should be
283 * received by this endpoint
284 *
285 * @param expectedCount the number of message exchanges that should be
286 * expected by this endpoint
287 */
288 public void expectedMinimumMessageCount(int expectedCount) {
289 setMinimumExpectedMessageCount(expectedCount);
290 }
291
292 /**
293 * Adds an expectation that the given header name & value are received by this
294 * endpoint
295 */
296 public void expectedHeaderReceived(String name, String value) {
297 this.headerName = name;
298 this.headerValue = value;
299
300 expects(new Runnable() {
301 public void run() {
302 assertTrue("No header with name " + headerName + " found.", actualHeader != null);
303
304 assertEquals("Header of message", headerValue, actualHeader);
305 }
306 });
307 }
308
309 /**
310 * Adds an expectation that the given body values are received by this
311 * endpoint
312 */
313 public void expectedBodiesReceived(final List bodies) {
314 expectedMessageCount(bodies.size());
315 this.expectedBodyValues = bodies;
316 this.actualBodyValues = new ArrayList();
317
318 expects(new Runnable() {
319 public void run() {
320 for (int i = 0; i < expectedBodyValues.size(); i++) {
321 Exchange exchange = getReceivedExchanges().get(i);
322 assertTrue("No exchange received for counter: " + i, exchange != null);
323
324 Object expectedBody = expectedBodyValues.get(i);
325 Object actualBody = actualBodyValues.get(i);
326
327 assertEquals("Body of message: " + i, expectedBody, actualBody);
328 }
329 }
330 });
331 }
332
333 /**
334 * Adds an expectation that the given body values are received by this
335 * endpoint
336 */
337 public void expectedBodiesReceived(Object... bodies) {
338 List bodyList = new ArrayList();
339 bodyList.addAll(Arrays.asList(bodies));
340 expectedBodiesReceived(bodyList);
341 }
342
343 /**
344 * Adds an expectation that messages received should have ascending values
345 * of the given expression such as a user generated counter value
346 *
347 * @param expression
348 */
349 public void expectsAscending(final Expression<Exchange> expression) {
350 expects(new Runnable() {
351 public void run() {
352 assertMessagesAscending(expression);
353 }
354 });
355 }
356
357 /**
358 * Adds an expectation that messages received should have descending values
359 * of the given expression such as a user generated counter value
360 *
361 * @param expression
362 */
363 public void expectsDescending(final Expression<Exchange> expression) {
364 expects(new Runnable() {
365 public void run() {
366 assertMessagesDescending(expression);
367 }
368 });
369 }
370
371 /**
372 * Adds an expectation that no duplicate messages should be received using
373 * the expression to determine the message ID
374 *
375 * @param expression the expression used to create a unique message ID for
376 * message comparison (which could just be the message
377 * payload if the payload can be tested for uniqueness using
378 * {@link Object#equals(Object)} and
379 * {@link Object#hashCode()}
380 */
381 public void expectsNoDuplicates(final Expression<Exchange> expression) {
382 expects(new Runnable() {
383 public void run() {
384 assertNoDuplicates(expression);
385 }
386 });
387 }
388
389 /**
390 * Asserts that the messages have ascending values of the given expression
391 */
392 public void assertMessagesAscending(Expression<Exchange> expression) {
393 assertMessagesSorted(expression, true);
394 }
395
396 /**
397 * Asserts that the messages have descending values of the given expression
398 */
399 public void assertMessagesDescending(Expression<Exchange> expression) {
400 assertMessagesSorted(expression, false);
401 }
402
403 protected void assertMessagesSorted(Expression<Exchange> expression, boolean ascending) {
404 String type = ascending ? "ascending" : "descending";
405 ExpressionComparator comparator = new ExpressionComparator(expression);
406 List<Exchange> list = getReceivedExchanges();
407 for (int i = 1; i < list.size(); i++) {
408 int j = i - 1;
409 Exchange e1 = list.get(j);
410 Exchange e2 = list.get(i);
411 int result = comparator.compare(e1, e2);
412 if (result == 0) {
413 fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: " + expression.evaluate(e1) + " for expression: " + expression + ". Exchanges: " + e1 + " and "
414 + e2);
415 } else {
416 if (!ascending) {
417 result = result * -1;
418 }
419 if (result > 0) {
420 fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1) + " and message " + i + " has value: " + expression.evaluate(e2) + " for expression: "
421 + expression + ". Exchanges: " + e1 + " and " + e2);
422 }
423 }
424 }
425 }
426
427 public void assertNoDuplicates(Expression<Exchange> expression) {
428 Map<Object, Exchange> map = new HashMap<Object, Exchange>();
429 List<Exchange> list = getReceivedExchanges();
430 for (int i = 0; i < list.size(); i++) {
431 Exchange e2 = list.get(i);
432 Object key = expression.evaluate(e2);
433 Exchange e1 = map.get(key);
434 if (e1 != null) {
435 fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
436 } else {
437 map.put(key, e2);
438 }
439 }
440 }
441
442 /**
443 * Adds the expection which will be invoked when enough messages are
444 * received
445 */
446 public void expects(Runnable runnable) {
447 tests.add(runnable);
448 }
449
450 /**
451 * Adds an assertion to the given message index
452 *
453 * @param messageIndex the number of the message
454 * @return the assertion clause
455 */
456 public AssertionClause message(final int messageIndex) {
457 AssertionClause clause = new AssertionClause() {
458 public void run() {
459 applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
460 }
461 };
462 expects(clause);
463 return clause;
464 }
465
466 /**
467 * Adds an assertion to all the received messages
468 *
469 * @return the assertion clause
470 */
471 public AssertionClause allMessages() {
472 AssertionClause clause = new AssertionClause() {
473 public void run() {
474 List<Exchange> list = getReceivedExchanges();
475 int index = 0;
476 for (Exchange exchange : list) {
477 applyAssertionOn(MockEndpoint.this, index++, exchange);
478 }
479 }
480 };
481 expects(clause);
482 return clause;
483 }
484
485 /**
486 * Asserts that the given index of message is received (starting at zero)
487 */
488 public Exchange assertExchangeReceived(int index) {
489 int count = getReceivedCounter();
490 assertTrue("Not enough messages received. Was: " + count, count > index);
491 return getReceivedExchanges().get(index);
492 }
493
494 // Properties
495 // -------------------------------------------------------------------------
496 public List<Throwable> getFailures() {
497 return failures;
498 }
499
500 public int getReceivedCounter() {
501 return getReceivedExchanges().size();
502 }
503
504 public List<Exchange> getReceivedExchanges() {
505 return receivedExchanges;
506 }
507
508 public int getExpectedCount() {
509 return expectedCount;
510 }
511
512 public long getSleepForEmptyTest() {
513 return sleepForEmptyTest;
514 }
515
516 /**
517 * Allows a sleep to be specified to wait to check that this endpoint really
518 * is empty when {@link #expectedMessageCount(int)} is called with zero
519 *
520 * @param sleepForEmptyTest the milliseconds to sleep for to determine that
521 * this endpoint really is empty
522 */
523 public void setSleepForEmptyTest(long sleepForEmptyTest) {
524 this.sleepForEmptyTest = sleepForEmptyTest;
525 }
526
527 public long getResultWaitTime() {
528 return resultWaitTime;
529 }
530
531 /**
532 * Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
533 * wait on a latch until it is satisfied
534 */
535 public void setResultWaitTime(long resultWaitTime) {
536 this.resultWaitTime = resultWaitTime;
537 }
538
539 /**
540 * Specifies the expected number of message exchanges that should be
541 * received by this endpoint
542 *
543 * @param expectedCount the number of message exchanges that should be
544 * expected by this endpoint
545 */
546 public void setExpectedMessageCount(int expectedCount) {
547 this.expectedCount = expectedCount;
548 if (expectedCount <= 0) {
549 latch = null;
550 } else {
551 latch = new CountDownLatch(expectedCount);
552 }
553 }
554
555 /**
556 * Specifies the minimum number of expected message exchanges that should be
557 * received by this endpoint
558 *
559 * @param expectedCount the number of message exchanges that should be
560 * expected by this endpoint
561 */
562 public void setMinimumExpectedMessageCount(int expectedCount) {
563 this.expectedMinimumCount = expectedCount;
564 if (expectedCount <= 0) {
565 latch = null;
566 } else {
567 latch = new CountDownLatch(expectedMinimumCount);
568 }
569 }
570
571 public Processor getReporter() {
572 return reporter;
573 }
574
575 /**
576 * Allows a processor to added to the endpoint to report on progress of the test
577 */
578 public void setReporter(Processor reporter) {
579 this.reporter = reporter;
580 }
581
582 // Implementation methods
583 // -------------------------------------------------------------------------
584 private void init() {
585 expectedCount = -1;
586 counter = 0;
587 processors = new HashMap<Integer, Processor>();
588 receivedExchanges = new CopyOnWriteArrayList<Exchange>();
589 failures = new CopyOnWriteArrayList<Throwable>();
590 tests = new CopyOnWriteArrayList<Runnable>();
591 latch = null;
592 sleepForEmptyTest = 1000L;
593 resultWaitTime = 20000L;
594 expectedMinimumCount = -1;
595 expectedBodyValues = null;
596 actualBodyValues = new ArrayList();
597 }
598
599 protected synchronized void onExchange(Exchange exchange) {
600 try {
601 if (reporter != null) {
602 reporter.process(exchange);
603 }
604
605 performAssertions(exchange);
606 } catch (Throwable e) {
607 failures.add(e);
608 }
609 if (latch != null) {
610 latch.countDown();
611 }
612 }
613
614 protected void performAssertions(Exchange exchange) throws Exception {
615 Message in = exchange.getIn();
616 Object actualBody = in.getBody();
617
618 if (headerName != null) {
619 actualHeader = in.getHeader(headerName);
620 }
621
622 if (expectedBodyValues != null) {
623 int index = actualBodyValues.size();
624 if (expectedBodyValues.size() > index) {
625 Object expectedBody = expectedBodyValues.get(index);
626 if (expectedBody != null) {
627 actualBody = in.getBody(expectedBody.getClass());
628 }
629 actualBodyValues.add(actualBody);
630 }
631 }
632
633 LOG.debug(getEndpointUri() + " >>>> " + (++counter) + " : " + exchange + " with body: " + actualBody);
634
635 receivedExchanges.add(exchange);
636
637 Processor processor = processors.get(getReceivedCounter()) != null
638 ? processors.get(getReceivedCounter()) : defaultProcessor;
639
640 if (processor != null) {
641 processor.process(exchange);
642 }
643 }
644
645 protected void waitForCompleteLatch() throws InterruptedException {
646 if (latch == null) {
647 fail("Should have a latch!");
648 }
649
650 // now lets wait for the results
651 LOG.debug("Waiting on the latch for: " + resultWaitTime + " millis");
652 latch.await(resultWaitTime, TimeUnit.MILLISECONDS);
653 }
654
655 protected void assertEquals(String message, Object expectedValue, Object actualValue) {
656 if (!ObjectHelper.equal(expectedValue, actualValue)) {
657 fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
658 }
659 }
660
661 protected void assertTrue(String message, boolean predicate) {
662 if (!predicate) {
663 fail(message);
664 }
665 }
666
667 protected void fail(Object message) {
668 if (LOG.isDebugEnabled()) {
669 List<Exchange> list = getReceivedExchanges();
670 int index = 0;
671 for (Exchange exchange : list) {
672 LOG.debug("Received[" + (++index) + "]: " + exchange);
673 }
674 }
675 throw new AssertionError(getEndpointUri() + " " + message);
676 }
677
678 public int getExpectedMinimumCount() {
679 return expectedMinimumCount;
680 }
681
682 public void await() throws InterruptedException {
683 if (latch != null) {
684 latch.await();
685 }
686 }
687
688 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
689 if (latch != null) {
690 return latch.await(timeout, unit);
691 }
692 return true;
693 }
694
695 public boolean isSingleton() {
696 return true;
697 }
698 }