/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Channel;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.engine.DefaultRoute;
import org.apache.camel.processor.StreamResequencer;
import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
import org.apache.camel.support.service.ServiceHelper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

@DisabledOnOs(value={OS.LINUX}, architectures={"s390x"}, disabledReason="This test does not run reliably multiple platforms (see CAMEL-21438)")
public class StreamResequencerTest
extends ContextTestSupport {
    protected void sendBodyAndHeader(String endpointUri, final Object body, final String headerName, final Object headerValue) {
        this.template.send(endpointUri, new Processor(){

            public void process(Exchange exchange) {
                Message in = exchange.getIn();
                in.setBody(body);
                in.setHeader(headerName, headerValue);
                in.setHeader("testCase", (Object)StreamResequencerTest.this.getName());
            }
        });
    }

    @Test
    public void testSendMessagesInWrongOrderButReceiveThemInCorrectOrder() throws Exception {
        this.getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"msg1", "msg2", "msg3", "msg4"});
        this.sendBodyAndHeader("direct:start", "msg4", "seqnum", 4L);
        this.sendBodyAndHeader("direct:start", "msg1", "seqnum", 1L);
        this.sendBodyAndHeader("direct:start", "msg3", "seqnum", 3L);
        this.sendBodyAndHeader("direct:start", "msg2", "seqnum", 2L);
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testMultithreaded() throws Exception {
        int numMessages = 100;
        Object[] bodies = new Object[numMessages];
        for (int i = 0; i < numMessages; ++i) {
            bodies[i] = "msg" + i;
        }
        this.getMockEndpoint("mock:result").expectedBodiesReceived(bodies);
        this.getMockEndpoint("mock:result").setResultWaitTime(20000L);
        ProducerTemplate producerTemplate = this.context.createProducerTemplate();
        ProducerTemplate producerTemplate2 = this.context.createProducerTemplate();
        ExecutorService service = this.context.getExecutorServiceManager().newFixedThreadPool((Object)this, this.getName(), 2);
        service.execute(new Sender(producerTemplate, 0, numMessages, 2));
        service.execute(new Sender(producerTemplate2, 1, numMessages, 2));
        this.assertMockEndpointsSatisfied();
        ServiceHelper.stopService((Object[])new Object[]{producerTemplate, producerTemplate2});
    }

    @Override
    protected boolean useJmx() {
        boolean enable = "testStreamResequencerTypeWithJmx".equals(this.getName());
        this.log.info("Going to {} JMX for the test {}", (Object)(enable ? "enable" : "disable"), (Object)this.getName());
        return enable;
    }

    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder(){

            public void configure() {
                this.from("direct:start").resequence((Expression)this.header("seqnum")).stream().timeout(1000L).deliveryAttemptInterval(10L).to("mock:result");
            }
        };
    }

    @Test
    public void testStreamResequencerTypeWithJmx() throws Exception {
        this.doTestStreamResequencerType();
    }

    @Test
    public void testStreamResequencerTypeWithoutJmx() throws Exception {
        this.doTestStreamResequencerType();
    }

    protected void doTestStreamResequencerType() throws Exception {
        List<Route> list = StreamResequencerTest.getRouteList(this.createRouteBuilder());
        Assertions.assertEquals((int)1, (int)list.size(), (String)("Number of routes created: " + String.valueOf(list)));
        Route route = list.get(0);
        DefaultRoute consumerRoute = StreamResequencerTest.assertIsInstanceOf(DefaultRoute.class, route);
        Channel channel = StreamResequencerTest.unwrapChannel(consumerRoute.getProcessor());
        Assertions.assertNotNull((Object)channel, (String)"There should be a channel");
        StreamResequencerTest.assertIsInstanceOf(DefaultErrorHandler.class, channel.getErrorHandler());
        StreamResequencerTest.assertIsInstanceOf(StreamResequencer.class, channel.getNextProcessor());
    }

    private static class Sender
    implements Runnable {
        private final ProducerTemplate template;
        private final int start;
        private final int end;
        private final int increment;
        private final Random random;

        Sender(ProducerTemplate template, int start, int end, int increment) {
            this.template = template;
            this.start = start;
            this.end = end;
            this.increment = increment;
            this.random = new Random();
        }

        @Override
        public void run() {
            for (long i = (long)this.start; i < (long)this.end; i += (long)this.increment) {
                try {
                    Thread.sleep(this.random.nextInt(10));
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.template.sendBodyAndHeader("direct:start", (Object)("msg" + i), "seqnum", (Object)i);
            }
        }
    }
}

