/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.issues;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.ExpressionNode;
import org.apache.camel.model.SplitDefinition;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SplitterParallelAsyncProcessorIssueTest
extends ContextTestSupport {
    private final Set<String> threads = new HashSet<String>();

    @Test
    public void testSplitParallelProcessingMaxThreads() throws Exception {
        this.getMockEndpoint("mock:split").expectedMessageCount(10);
        String xmlBody = "<employees><employee><id>1</id><name>John</name></employee><employee><id>2</id><name>Jane</name></employee><employee><id>3</id><name>Jim</name></employee><employee><id>4</id><name>Jack</name></employee><employee><id>5</id><name>Jill</name></employee><employee><id>6</id><name>opi</name></employee><employee><id>7</id><name>ds</name></employee><employee><id>8</id><name>hhh</name></employee><employee><id>9</id><name>fki</name></employee><employee><id>10</id><name>abc</name></employee></employees> ";
        this.template.sendBody("direct:start", (Object)xmlBody);
        this.assertMockEndpointsSatisfied();
        this.log.info("{} Threads in use: {}", (Object)this.threads.size(), this.threads);
        Assertions.assertTrue((this.threads.size() <= 4 ? 1 : 0) != 0, (String)("Should not use more than 4 threads, was: " + this.threads.size()));
    }

    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder(){

            public void configure() throws Exception {
                ThreadPoolProfile myThreadPoolProfile = new ThreadPoolProfile("threadPoolProfile");
                myThreadPoolProfile.setMaxPoolSize(Integer.valueOf(2));
                myThreadPoolProfile.setPoolSize(Integer.valueOf(2));
                myThreadPoolProfile.setMaxQueueSize(Integer.valueOf(2));
                myThreadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
                this.getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(myThreadPoolProfile);
                AsyncProcessor asyncProcessor = new AsyncProcessor(){

                    public void process(Exchange exchange) throws Exception {
                    }

                    public boolean process(Exchange exchange, AsyncCallback callback) {
                        CompletableFuture.runAsync(() -> {
                            SplitterParallelAsyncProcessorIssueTest.this.threads.add(Thread.currentThread().getName());
                            try {
                                Thread.sleep(250L);
                                exchange.getIn().setBody((Object)"Processed asynchronously");
                            }
                            catch (InterruptedException e) {
                                exchange.setException((Throwable)e);
                            }
                            finally {
                                callback.done(false);
                            }
                        });
                        return false;
                    }

                    public CompletableFuture<Exchange> processAsync(Exchange exchange) {
                        return null;
                    }
                };
                ((ExpressionNode)((ExpressionNode)((ExpressionNode)((SplitDefinition)this.from("direct:start").split().xpath("/employees/employee")).parallelProcessing().stopOnException().timeout(10000L).executorService("threadPoolProfile").synchronous().process(e -> SplitterParallelAsyncProcessorIssueTest.this.threads.add(Thread.currentThread().getName()))).process((Processor)asyncProcessor)).process(e -> SplitterParallelAsyncProcessorIssueTest.this.threads.add(Thread.currentThread().getName()))).delay(250L).end().to("mock:split");
            }
        };
    }
}

