/*
 * Decompiled with CFR 0.152.
 */
package org.apache.servicemix.camel.nmr;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.ThreadsDefinition;
import org.apache.servicemix.camel.nmr.AbstractComponentTest;
import org.apache.servicemix.nmr.api.Exchange;
import org.apache.servicemix.nmr.api.Status;
import org.junit.Assert;
import org.junit.Test;

public class CamelAsyncRouteTest
extends AbstractComponentTest {
    private static final String HANDLED_BY_THREAD = "HandledByThread";
    private static final int COUNT = 1000;
    private static final long DELAY = 60000L;
    private CountDownLatch done;

    public void setUp() throws Exception {
        super.setUp();
        this.done = new CountDownLatch(1000);
    }

    @Test
    public void testCamelThreads() throws InterruptedException {
        this.expectDefaultMessageCount("mock:sent");
        this.expectDefaultMessageCount("mock:threads").whenAnyExchangeReceived((Processor)new AssertHandledByCamelThreadProcessor());
        for (int i = 0; i < 1000; ++i) {
            template.asyncSendBody("direct:threads", (Object)("Simple message body " + i));
        }
        this.assertMockEndpointsSatisfied();
        CamelAsyncRouteTest.assertTrue((String)"All NMR exchanges should have been marked DONE", (boolean)this.done.await(60000L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testCamelSeda() throws InterruptedException {
        this.expectDefaultMessageCount("mock:sent");
        this.expectDefaultMessageCount("mock:seda");
        for (int i = 0; i < 1000; ++i) {
            template.asyncSendBody("seda:seda", (Object)("Simple message body " + i));
        }
        this.assertMockEndpointsSatisfied();
        CamelAsyncRouteTest.assertTrue((String)"All NMR exchanges should have been marked DONE", (boolean)this.done.await(60000L, TimeUnit.MILLISECONDS));
    }

    private MockEndpoint expectDefaultMessageCount(String endpoint) {
        MockEndpoint mock = this.getMockEndpoint(endpoint);
        mock.setResultWaitTime(60000L);
        mock.expectedMessageCount(1000);
        return mock;
    }

    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder(){

            public void configure() throws Exception {
                ((RouteDefinition)this.from("direct:threads").to("mock:sent")).to("nmr:threads");
                ((ThreadsDefinition)this.from("nmr:threads").threads(5).process(new Processor(){

                    public void process(org.apache.camel.Exchange exchange) throws Exception {
                        exchange.setProperty(CamelAsyncRouteTest.HANDLED_BY_THREAD, (Object)Thread.currentThread());
                    }
                })).to("mock:threads");
                ((RouteDefinition)this.from("seda:seda?concurrentConsumers=10").to("mock:sent")).to("nmr:seda");
                this.from("nmr:seda").to("seda:seda-internal?waitForTaskToComplete=Never");
                this.from("seda:seda-internal").to("mock:seda");
            }
        };
    }

    public void exchangeDelivered(Exchange exchange) {
        if (exchange.getStatus().equals((Object)Status.Done)) {
            this.done.countDown();
        }
    }

    private static final class AssertHandledByCamelThreadProcessor
    implements Processor {
        private AssertHandledByCamelThreadProcessor() {
        }

        public void process(org.apache.camel.Exchange exchange) throws Exception {
            Thread thread = (Thread)exchange.getProperty(CamelAsyncRouteTest.HANDLED_BY_THREAD, Thread.class);
            Assert.assertTrue((String)("processor should have been called from the Camel 'threads' thread pool instead of " + thread.getName()), (thread.getName().contains("Camel") && thread.getName().contains("Thread") ? 1 : 0) != 0);
        }
    }
}

