package org.apache.camel.processor.aggregator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.builder.ValueBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.processor.BodyInAggregatingStrategy;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.spi.ExceptionHandler;

/* loaded from: input_file:org/apache/camel/processor/aggregator/AggregateProcessorTest.class */
public class AggregateProcessorTest extends ContextTestSupport {
    private ExecutorService executorService;

    @Override // org.apache.camel.ContextTestSupport
    public boolean isUseRouteBuilder() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.ContextTestSupport
    public void setUp() throws Exception {
        super.setUp();
        this.executorService = Executors.newSingleThreadExecutor();
    }

    public void testAggregateProcessorCompletionPredicate() throws Exception {
        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.expectedBodiesReceived(new Object[]{"A+B+END"});
        mockEndpoint.expectedPropertyReceived("CamelAggregatedCompletedBy", "predicate");
        SendProcessor sendProcessor = new SendProcessor(this.context.getEndpoint("mock:result"));
        ValueBuilder header = header("id");
        BodyInAggregatingStrategy bodyInAggregatingStrategy = new BodyInAggregatingStrategy();
        Predicate contains = body().contains("END");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, sendProcessor, header, bodyInAggregatingStrategy, this.executorService);
        aggregateProcessor.setCompletionPredicate(contains);
        aggregateProcessor.setEagerCheckCompletion(false);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("END");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("D");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateProcessorCompletionPredicateEager() throws Exception {
        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.expectedBodiesReceived(new Object[]{"A+B+END"});
        mockEndpoint.expectedPropertyReceived("CamelAggregatedCompletedBy", "predicate");
        SendProcessor sendProcessor = new SendProcessor(this.context.getEndpoint("mock:result"));
        ValueBuilder header = header("id");
        BodyInAggregatingStrategy bodyInAggregatingStrategy = new BodyInAggregatingStrategy();
        Predicate isEqualTo = body().isEqualTo("END");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, sendProcessor, header, bodyInAggregatingStrategy, this.executorService);
        aggregateProcessor.setCompletionPredicate(isEqualTo);
        aggregateProcessor.setEagerCheckCompletion(true);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("END");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("D");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateProcessorCompletionAggregatedSize() throws Exception {
        doTestAggregateProcessorCompletionAggregatedSize(false);
    }

    public void testAggregateProcessorCompletionAggregatedSizeEager() throws Exception {
        doTestAggregateProcessorCompletionAggregatedSize(true);
    }

    private void doTestAggregateProcessorCompletionAggregatedSize(boolean z) throws Exception {
        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.expectedBodiesReceived(new Object[]{"A+B+C"});
        mockEndpoint.expectedPropertyReceived("CamelAggregatedCompletedBy", "size");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, new SendProcessor(this.context.getEndpoint("mock:result")), header("id"), new BodyInAggregatingStrategy(), this.executorService);
        aggregateProcessor.setCompletionSize(3);
        aggregateProcessor.setEagerCheckCompletion(z);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("C");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("D");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateProcessorCompletionTimeout() throws Exception {
        doTestAggregateProcessorCompletionTimeout(false);
    }

    public void testAggregateProcessorCompletionTimeoutEager() throws Exception {
        doTestAggregateProcessorCompletionTimeout(true);
    }

    private void doTestAggregateProcessorCompletionTimeout(boolean z) throws Exception {
        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.expectedBodiesReceived(new Object[]{"A+B+C"});
        mockEndpoint.expectedPropertyReceived("CamelAggregatedCompletedBy", "timeout");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, new SendProcessor(this.context.getEndpoint("mock:result")), header("id"), new BodyInAggregatingStrategy(), this.executorService);
        aggregateProcessor.setCompletionTimeout(3000L);
        aggregateProcessor.setEagerCheckCompletion(z);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("C");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("D");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        Thread.sleep(250L);
        aggregateProcessor.process(defaultExchange2);
        Thread.sleep(500L);
        aggregateProcessor.process(defaultExchange3);
        Thread.sleep(5000L);
        aggregateProcessor.process(defaultExchange4);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateCompletionInterval() throws Exception {
        this.context.start();
        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.expectedBodiesReceived(new Object[]{"A+B+C", "D"});
        mockEndpoint.expectedPropertyReceived("CamelAggregatedCompletedBy", "interval");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, new SendProcessor(this.context.getEndpoint("mock:result")), header("id"), new BodyInAggregatingStrategy(), this.executorService);
        aggregateProcessor.setCompletionInterval(3000L);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("C");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("D");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        Thread.sleep(5000L);
        aggregateProcessor.process(defaultExchange4);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateInitialCompletionInterval() throws Exception {
        this.context.start();
        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.expectedBodiesReceived(new Object[]{"A+B", "C+D"});
        mockEndpoint.expectedPropertyReceived("CamelAggregatedCompletedBy", "interval");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, new SendProcessor(this.context.getEndpoint("mock:result")), header("id"), new BodyInAggregatingStrategy(), this.executorService);
        aggregateProcessor.setCompletionInterval(2000L);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("C");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("D");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        Thread.sleep(1500L);
        aggregateProcessor.process(defaultExchange2);
        Thread.sleep(500L);
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"A+C+END"});
        SendProcessor sendProcessor = new SendProcessor(this.context.getEndpoint("mock:result"));
        ValueBuilder header = header("id");
        BodyInAggregatingStrategy bodyInAggregatingStrategy = new BodyInAggregatingStrategy();
        Predicate contains = body().contains("END");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, sendProcessor, header, bodyInAggregatingStrategy, this.executorService);
        aggregateProcessor.setCompletionPredicate(contains);
        aggregateProcessor.setIgnoreInvalidCorrelationKeys(true);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("C");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("END");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateBadCorrelationKey() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"A+C+END"});
        SendProcessor sendProcessor = new SendProcessor(this.context.getEndpoint("mock:result"));
        ValueBuilder header = header("id");
        BodyInAggregatingStrategy bodyInAggregatingStrategy = new BodyInAggregatingStrategy();
        Predicate contains = body().contains("END");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, sendProcessor, header, bodyInAggregatingStrategy, this.executorService);
        aggregateProcessor.setCompletionPredicate(contains);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("C");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("END");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        try {
            aggregateProcessor.process(defaultExchange2);
            fail("Should have thrown an exception");
        } catch (CamelExchangeException e) {
            assertEquals("Invalid correlation key. Exchange[Message: B]", e.getMessage());
        }
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateCloseCorrelationKeyOnCompletion() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"A+B+END"});
        SendProcessor sendProcessor = new SendProcessor(this.context.getEndpoint("mock:result"));
        ValueBuilder header = header("id");
        BodyInAggregatingStrategy bodyInAggregatingStrategy = new BodyInAggregatingStrategy();
        Predicate contains = body().contains("END");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, sendProcessor, header, bodyInAggregatingStrategy, this.executorService);
        aggregateProcessor.setCompletionPredicate(contains);
        aggregateProcessor.setCloseCorrelationKeyOnCompletion(1000);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("END");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("C");
        defaultExchange4.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        try {
            aggregateProcessor.process(defaultExchange4);
            fail("Should have thrown an exception");
        } catch (CamelExchangeException e) {
            assertEquals("The correlation key [123] has been closed. Exchange[Message: C]", e.getMessage());
        }
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateUseBatchSizeFromConsumer() throws Exception {
        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.expectedBodiesReceived(new Object[]{"A+B", "C+D+E"});
        mockEndpoint.expectedPropertyReceived("CamelAggregatedCompletedBy", "consumer");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, new SendProcessor(this.context.getEndpoint("mock:result")), header("id"), new BodyInAggregatingStrategy(), this.executorService);
        aggregateProcessor.setCompletionSize(100);
        aggregateProcessor.setCompletionFromBatchConsumer(true);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        defaultExchange.setProperty("CamelBatchIndex", 0);
        defaultExchange.setProperty("CamelBatchSize", 2);
        defaultExchange.setProperty("CamelBatchComplete", false);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 123);
        defaultExchange2.setProperty("CamelBatchIndex", 1);
        defaultExchange2.setProperty("CamelBatchSize", 2);
        defaultExchange2.setProperty("CamelBatchComplete", true);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("C");
        defaultExchange3.getIn().setHeader("id", 123);
        defaultExchange3.setProperty("CamelBatchIndex", 0);
        defaultExchange3.setProperty("CamelBatchSize", 3);
        defaultExchange3.setProperty("CamelBatchComplete", false);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("D");
        defaultExchange4.getIn().setHeader("id", 123);
        defaultExchange4.setProperty("CamelBatchIndex", 1);
        defaultExchange4.setProperty("CamelBatchSize", 3);
        defaultExchange4.setProperty("CamelBatchComplete", false);
        DefaultExchange defaultExchange5 = new DefaultExchange(this.context);
        defaultExchange5.getIn().setBody("E");
        defaultExchange5.getIn().setHeader("id", 123);
        defaultExchange5.setProperty("CamelBatchIndex", 2);
        defaultExchange5.setProperty("CamelBatchSize", 3);
        defaultExchange5.setProperty("CamelBatchComplete", true);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        aggregateProcessor.process(defaultExchange5);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateLogFailedExchange() throws Exception {
        doTestAggregateLogFailedExchange(null);
    }

    public void testAggregateHandleFailedExchange() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        doTestAggregateLogFailedExchange(new ExceptionHandler() { // from class: org.apache.camel.processor.aggregator.AggregateProcessorTest.1
            public void handleException(Throwable th) {
            }

            public void handleException(String str, Throwable th) {
            }

            public void handleException(String str, Exchange exchange, Throwable th) {
                Assert.assertEquals("Error processing aggregated exchange", str);
                Assert.assertEquals("B+Kaboom+END", exchange.getIn().getBody());
                Assert.assertEquals("Damn", th.getMessage());
                atomicBoolean.set(true);
            }
        });
        assertEquals(true, atomicBoolean.get());
    }

    private void doTestAggregateLogFailedExchange(ExceptionHandler exceptionHandler) throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"A+END"});
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, new Processor() { // from class: org.apache.camel.processor.aggregator.AggregateProcessorTest.2
            public void process(Exchange exchange) throws Exception {
                if (((String) exchange.getIn().getBody(String.class)).contains("Kaboom")) {
                    throw new IllegalArgumentException("Damn");
                }
                SendProcessor sendProcessor = new SendProcessor(AggregateProcessorTest.this.context.getEndpoint("mock:result"));
                sendProcessor.start();
                sendProcessor.process(exchange);
            }
        }, header("id"), new BodyInAggregatingStrategy(), this.executorService);
        aggregateProcessor.setEagerCheckCompletion(true);
        aggregateProcessor.setCompletionPredicate(body().isEqualTo("END"));
        if (exceptionHandler != null) {
            aggregateProcessor.setExceptionHandler(exceptionHandler);
        }
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 456);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("Kaboom");
        defaultExchange3.getIn().setHeader("id", 456);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("END");
        defaultExchange4.getIn().setHeader("id", 456);
        DefaultExchange defaultExchange5 = new DefaultExchange(this.context);
        defaultExchange5.getIn().setBody("END");
        defaultExchange5.getIn().setHeader("id", 123);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        aggregateProcessor.process(defaultExchange5);
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }

    public void testAggregateForceCompletion() throws Exception {
        this.context.start();
        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.expectedBodiesReceived(new Object[]{"B+END", "A+END"});
        mockEndpoint.expectedPropertyReceived("CamelAggregatedCompletedBy", "forceCompletion");
        AggregateProcessor aggregateProcessor = new AggregateProcessor(this.context, new SendProcessor(this.context.getEndpoint("mock:result")), header("id"), new BodyInAggregatingStrategy(), this.executorService);
        aggregateProcessor.setCompletionSize(10);
        aggregateProcessor.start();
        DefaultExchange defaultExchange = new DefaultExchange(this.context);
        defaultExchange.getIn().setBody("A");
        defaultExchange.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange2 = new DefaultExchange(this.context);
        defaultExchange2.getIn().setBody("B");
        defaultExchange2.getIn().setHeader("id", 456);
        DefaultExchange defaultExchange3 = new DefaultExchange(this.context);
        defaultExchange3.getIn().setBody("END");
        defaultExchange3.getIn().setHeader("id", 123);
        DefaultExchange defaultExchange4 = new DefaultExchange(this.context);
        defaultExchange4.getIn().setBody("END");
        defaultExchange4.getIn().setHeader("id", 456);
        aggregateProcessor.process(defaultExchange);
        aggregateProcessor.process(defaultExchange2);
        aggregateProcessor.process(defaultExchange3);
        aggregateProcessor.process(defaultExchange4);
        assertEquals("should not have completed yet", 0, mockEndpoint.getExchanges().size());
        aggregateProcessor.forceCompletionOfAllGroups();
        assertMockEndpointsSatisfied();
        aggregateProcessor.stop();
    }
}
