package org.apache.camel.processor;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;

@Isolated
/* loaded from: input_file:org/apache/camel/processor/ThrottlingGroupingTest.class */
public class ThrottlingGroupingTest extends ContextTestSupport {
    private static final int MESSAGE_COUNT = 20;
    protected static final int CONCURRENT_REQUESTS = 2;
    protected static Map<String, Semaphore> semaphores;

    @Test
    public void testGroupingWithSingleConstant() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"Hello World", "Bye World"});
        getMockEndpoint("mock:dead").expectedBodiesReceived(new Object[]{"Kaboom"});
        this.template.sendBodyAndHeader("seda:a", "Kaboom", "max", (Object) null);
        this.template.sendBodyAndHeader("seda:a", "Hello World", "max", Integer.valueOf(CONCURRENT_REQUESTS));
        this.template.sendBodyAndHeader("seda:a", "Bye World", "max", Integer.valueOf(CONCURRENT_REQUESTS));
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testGroupingWithDynamicHeaderExpression() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"Hello World"});
        getMockEndpoint("mock:result2").expectedBodiesReceived(new Object[]{"Bye World"});
        getMockEndpoint("mock:dead").expectedBodiesReceived(new Object[]{"Kaboom", "Saloon"});
        getMockEndpoint("mock:resultdynamic").expectedBodiesReceived(new Object[]{"Hello Dynamic World", "Bye Dynamic World"});
        HashMap hashMap = new HashMap();
        this.template.sendBodyAndHeaders("seda:a", "Kaboom", hashMap);
        this.template.sendBodyAndHeaders("seda:a", "Saloon", hashMap);
        hashMap.put("max", "2");
        this.template.sendBodyAndHeaders("seda:a", "Hello World", hashMap);
        this.template.sendBodyAndHeaders("seda:b", "Bye World", hashMap);
        hashMap.put("max", "2");
        hashMap.put("key", "1");
        this.template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", hashMap);
        hashMap.put("key", "2");
        this.template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", hashMap);
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue() throws Exception {
        sendMessagesAndAwaitDelivery(20, "direct:ga", (MockEndpoint) resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class));
    }

    private void sendMessagesAndAwaitDelivery(int i, String str, MockEndpoint mockEndpoint) throws InterruptedException {
        semaphores = new ConcurrentHashMap();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        if (mockEndpoint != null) {
            try {
                mockEndpoint.expectedMessageCount(i);
            } finally {
                shutdownAndAwait(newFixedThreadPool);
            }
        }
        for (int i2 = 0; i2 < i; i2++) {
            int i3 = i2;
            newFixedThreadPool.execute(() -> {
                HashMap hashMap = new HashMap();
                if (i3 % CONCURRENT_REQUESTS == 0) {
                    hashMap.put("key", "1");
                } else {
                    hashMap.put("key", "2");
                }
                this.template.sendBodyAndHeaders(str, "<message>payload</message>", hashMap);
            });
        }
        if (mockEndpoint != null) {
            mockEndpoint.assertIsSatisfied();
        }
    }

    @Test
    public void testConfigurationWithHeaderExpression() throws Exception {
        MockEndpoint mockEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
        mockEndpoint.expectedMessageCount(20);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        try {
            sendMessagesWithHeaderExpression(newFixedThreadPool, mockEndpoint, CONCURRENT_REQUESTS, 20);
        } finally {
            shutdownAndAwait(newFixedThreadPool);
        }
    }

    private void sendMessagesWithHeaderExpression(ExecutorService executorService, MockEndpoint mockEndpoint, int i, int i2) throws InterruptedException {
        mockEndpoint.expectedMessageCount(i2);
        semaphores = new ConcurrentHashMap();
        for (int i3 = 0; i3 < i2; i3++) {
            int i4 = i3;
            executorService.execute(() -> {
                HashMap hashMap = new HashMap();
                hashMap.put("throttleValue", Integer.valueOf(i));
                if (i4 % CONCURRENT_REQUESTS == 0) {
                    hashMap.put("key", "1");
                } else {
                    hashMap.put("key", "2");
                }
                this.template.sendBodyAndHeaders("direct:gexpressionHeader", "<message>payload</message>", hashMap);
            });
        }
        mockEndpoint.assertIsSatisfied();
    }

    private void shutdownAndAwait(ExecutorService executorService) {
        executorService.shutdown();
        try {
            Assertions.assertTrue(executorService.awaitTermination(10L, TimeUnit.SECONDS), "Test ExecutorService shutdown is not expected to take longer than 10 seconds.");
        } catch (InterruptedException e) {
            Assertions.fail("Test ExecutorService shutdown is not expected to be interrupted.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.ContextTestSupport
    public RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() { // from class: org.apache.camel.processor.ThrottlingGroupingTest.1
            public void configure() throws Exception {
                errorHandler(deadLetterChannel("mock:dead"));
                from("seda:a").throttle(header("max"), 1L).to("mock:result");
                from("seda:b").throttle(header("max"), 2L).to("mock:result2");
                from("seda:c").throttle(header("max")).correlationExpression(header("key")).to("mock:resultdynamic");
                from("direct:ga").throttle(constant(Integer.valueOf(ThrottlingGroupingTest.CONCURRENT_REQUESTS)), header("key")).process(exchange -> {
                    String str = (String) exchange.getMessage().getHeader("key");
                    Assertions.assertTrue(ThrottlingGroupingTest.semaphores.computeIfAbsent(str, str2 -> {
                        return new Semaphore(ThrottlingGroupingTest.CONCURRENT_REQUESTS);
                    }).tryAcquire(), "'direct:ga' too many requests for key " + str);
                }).delay(100L).process(exchange2 -> {
                    ThrottlingGroupingTest.semaphores.get(exchange2.getMessage().getHeader("key")).release();
                }).to(new String[]{"log:gresult", "mock:gresult"});
                from("direct:gexpressionHeader").throttle(header("throttleValue"), header("key")).process(exchange3 -> {
                    String str = (String) exchange3.getMessage().getHeader("key");
                    Assertions.assertTrue(ThrottlingGroupingTest.semaphores.computeIfAbsent(str, str2 -> {
                        return new Semaphore(((Integer) exchange3.getMessage().getHeader("throttleValue")).intValue());
                    }).tryAcquire(), "'direct:gexpressionHeader' too many requests for key " + str);
                }).delay(100L).process(exchange4 -> {
                    ThrottlingGroupingTest.semaphores.get(exchange4.getMessage().getHeader("key")).release();
                }).to(new String[]{"log:gresult", "mock:gresult"});
            }
        };
    }
}
