package org.apache.camel.processor;

import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Channel;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.EventDrivenConsumerRoute;
import org.apache.camel.util.ServiceHelper;
import org.junit.Test;

/* loaded from: input_file:org/apache/camel/processor/StreamResequencerTest.class */
public class StreamResequencerTest extends ContextTestSupport {

    /* loaded from: input_file:org/apache/camel/processor/StreamResequencerTest$Sender.class */
    private static class Sender implements Runnable {
        private final ProducerTemplate template;
        private final int start;
        private final int end;
        private final int increment;
        private final Random random = new Random();

        Sender(ProducerTemplate producerTemplate, int i, int i2, int i3) {
            this.template = producerTemplate;
            this.start = i;
            this.end = i2;
            this.increment = i3;
        }

        @Override // java.lang.Runnable
        public void run() {
            long j = this.start;
            while (true) {
                long j2 = j;
                if (j2 >= this.end) {
                    return;
                }
                try {
                    Thread.sleep(this.random.nextInt(20));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.template.sendBodyAndHeader("direct:start", "msg" + j2, "seqnum", Long.valueOf(j2));
                j = j2 + this.increment;
            }
        }
    }

    protected void sendBodyAndHeader(String str, final Object obj, final String str2, final Object obj2) {
        this.template.send(str, new Processor() { // from class: org.apache.camel.processor.StreamResequencerTest.1
            public void process(Exchange exchange) {
                Message in = exchange.getIn();
                in.setBody(obj);
                in.setHeader(str2, obj2);
                in.setHeader("testCase", StreamResequencerTest.this.getName());
            }
        });
    }

    @Test
    public void testSendMessagesInWrongOrderButReceiveThemInCorrectOrder() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"msg1", "msg2", "msg3", "msg4"});
        sendBodyAndHeader("direct:start", "msg4", "seqnum", 4L);
        sendBodyAndHeader("direct:start", "msg1", "seqnum", 1L);
        sendBodyAndHeader("direct:start", "msg3", "seqnum", 3L);
        sendBodyAndHeader("direct:start", "msg2", "seqnum", 2L);
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testMultithreaded() throws Exception {
        Object[] objArr = new Object[100];
        for (int i = 0; i < 100; i++) {
            objArr[i] = "msg" + i;
        }
        getMockEndpoint("mock:result").expectedBodiesReceived(objArr);
        getMockEndpoint("mock:result").setResultWaitTime(20000L);
        ProducerTemplate createProducerTemplate = this.context.createProducerTemplate();
        ProducerTemplate createProducerTemplate2 = this.context.createProducerTemplate();
        ExecutorService newFixedThreadPool = this.context.getExecutorServiceManager().newFixedThreadPool(this, getName(), 2);
        newFixedThreadPool.execute(new Sender(createProducerTemplate, 0, 100, 2));
        newFixedThreadPool.execute(new Sender(createProducerTemplate2, 1, 100, 2));
        assertMockEndpointsSatisfied();
        ServiceHelper.stopServices(new Object[]{createProducerTemplate, createProducerTemplate2});
    }

    @Override // org.apache.camel.ContextTestSupport
    protected boolean useJmx() {
        boolean equals = "testStreamResequencerTypeWithJmx".equals(getName());
        this.log.info("Going to {} JMX for the test {}", equals ? "enable" : "disable", getName());
        return equals;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.ContextTestSupport
    public RouteBuilder createRouteBuilder() {
        return new RouteBuilder() { // from class: org.apache.camel.processor.StreamResequencerTest.2
            public void configure() {
                from("direct:start").resequence(header("seqnum")).stream().timeout(100L).deliveryAttemptInterval(10L).to("mock:result");
            }
        };
    }

    @Test
    public void testStreamResequencerTypeWithJmx() throws Exception {
        doTestStreamResequencerType();
    }

    @Test
    public void testStreamResequencerTypeWithoutJmx() throws Exception {
        doTestStreamResequencerType();
    }

    protected void doTestStreamResequencerType() throws Exception {
        List<Route> routeList = getRouteList(createRouteBuilder());
        assertEquals("Number of routes created: " + routeList, 1L, routeList.size());
        Channel unwrapChannel = unwrapChannel(((EventDrivenConsumerRoute) assertIsInstanceOf(EventDrivenConsumerRoute.class, routeList.get(0))).getProcessor());
        assertIsInstanceOf(DefaultErrorHandler.class, unwrapChannel.getErrorHandler());
        assertIsInstanceOf(StreamResequencer.class, unwrapChannel.getNextProcessor());
    }
}
