package org.apache.camel.processor.throttle.concurrent;

import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.ThrottlerRejectedExecutionException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

@DisabledOnOs({OS.WINDOWS})
@DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com", disabledReason = "Flaky on Github CI")
/* loaded from: input_file:org/apache/camel/processor/throttle/concurrent/ConcurrentRequestsThrottlerTest.class */
public class ConcurrentRequestsThrottlerTest extends ContextTestSupport {
    private static final int INTERVAL = 500;
    private static final int MESSAGE_COUNT = 9;
    private static final int CONCURRENT_REQUESTS = 2;
    protected static Semaphore semaphore;

    @Test
    public void testSendLotsOfMessagesWithRejectExecution() throws Exception {
        resolveMandatoryEndpoint("mock:result", MockEndpoint.class).expectedMessageCount(CONCURRENT_REQUESTS);
        resolveMandatoryEndpoint("mock:error", MockEndpoint.class).expectedMessageCount(4);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(6);
        for (int i = 0; i < 6; i++) {
            try {
                newFixedThreadPool.execute(() -> {
                    this.template.sendBody("direct:start", "<message>payload</message>");
                });
            } catch (Throwable th) {
                shutdownAndAwait(newFixedThreadPool);
                throw th;
            }
        }
        assertMockEndpointsSatisfied();
        shutdownAndAwait(newFixedThreadPool);
    }

    @Test
    public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
        semaphore = new Semaphore(CONCURRENT_REQUESTS);
        sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a", MESSAGE_COUNT, (MockEndpoint) resolveMandatoryEndpoint("mock:result", MockEndpoint.class));
    }

    @Test
    public void testConfigurationWithConstantExpression() throws Exception {
        semaphore = new Semaphore(CONCURRENT_REQUESTS);
        sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:expressionConstant", MESSAGE_COUNT, (MockEndpoint) resolveMandatoryEndpoint("mock:result", MockEndpoint.class));
    }

    @Test
    public void testConfigurationWithHeaderExpression() throws Exception {
        MockEndpoint mockEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        mockEndpoint.expectedMessageCount(MESSAGE_COUNT);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(MESSAGE_COUNT);
        try {
            sendMessagesWithHeaderExpression(newFixedThreadPool, mockEndpoint, CONCURRENT_REQUESTS, MESSAGE_COUNT);
        } finally {
            shutdownAndAwait(newFixedThreadPool);
        }
    }

    @Test
    public void testConfigurationWithChangingHeaderExpression() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        try {
            MockEndpoint mockEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
            sendMessagesWithHeaderExpression(newFixedThreadPool, mockEndpoint, CONCURRENT_REQUESTS, MESSAGE_COUNT);
            Thread.sleep(500L);
            mockEndpoint.reset();
            sendMessagesWithHeaderExpression(newFixedThreadPool, mockEndpoint, 4, MESSAGE_COUNT);
            Thread.sleep(500L);
            mockEndpoint.reset();
            sendMessagesWithHeaderExpression(newFixedThreadPool, mockEndpoint, CONCURRENT_REQUESTS, MESSAGE_COUNT);
            Thread.sleep(500L);
            mockEndpoint.reset();
            sendMessagesWithHeaderExpression(newFixedThreadPool, mockEndpoint, 4, MESSAGE_COUNT);
        } finally {
            shutdownAndAwait(newFixedThreadPool);
        }
    }

    @Test
    public void testFifo() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"A", "B", "C", "D", "E", "F", "G", "H"});
        sendBody("direct:fifo");
        assertMockEndpointsSatisfied();
    }

    @Test
    public void testPermitReleaseOnException() throws Exception {
        getMockEndpoint("mock:error").expectedBodiesReceived(new Object[]{"A", "B", "C", "D", "E", "F", "G", "H"});
        sendBody("direct:release");
        assertMockEndpointsSatisfied();
    }

    private void sendMessagesAndAwaitDelivery(int i, final String str, int i2, MockEndpoint mockEndpoint) throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i2);
        if (mockEndpoint != null) {
            try {
                mockEndpoint.expectedMessageCount(i);
            } finally {
                shutdownAndAwait(newFixedThreadPool);
            }
        }
        for (int i3 = 0; i3 < i; i3++) {
            newFixedThreadPool.execute(new Runnable() { // from class: org.apache.camel.processor.throttle.concurrent.ConcurrentRequestsThrottlerTest.1
                @Override // java.lang.Runnable
                public void run() {
                    ConcurrentRequestsThrottlerTest.this.template.sendBody(str, "<message>payload</message>");
                }
            });
        }
        if (mockEndpoint != null) {
            mockEndpoint.assertIsSatisfied();
        }
    }

    private void sendMessagesWithHeaderExpression(ExecutorService executorService, MockEndpoint mockEndpoint, final int i, int i2) throws InterruptedException {
        mockEndpoint.expectedMessageCount(i2);
        semaphore = new Semaphore(i);
        for (int i3 = 0; i3 < i2; i3++) {
            executorService.execute(new Runnable() { // from class: org.apache.camel.processor.throttle.concurrent.ConcurrentRequestsThrottlerTest.2
                @Override // java.lang.Runnable
                public void run() {
                    ConcurrentRequestsThrottlerTest.this.template.sendBodyAndHeader("direct:expressionHeader", "<message>payload</message>", "throttleValue", Integer.valueOf(i));
                }
            });
        }
        mockEndpoint.assertIsSatisfied();
    }

    private void sendBody(String str) {
        Arrays.stream(new String[]{"A", "B", "C", "D", "E", "F", "G", "H"}).forEach(str2 -> {
            this.template.sendBody(str, str2);
        });
    }

    private void shutdownAndAwait(ExecutorService executorService) {
        executorService.shutdown();
        try {
            Assertions.assertTrue(executorService.awaitTermination(10L, TimeUnit.SECONDS), "Test ExecutorService shutdown is not expected to take longer than 10 seconds.");
        } catch (InterruptedException e) {
            Assertions.fail("Test ExecutorService shutdown is not expected to be interrupted.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.ContextTestSupport
    /* renamed from: createRouteBuilder, reason: merged with bridge method [inline-methods] */
    public RouteBuilder mo4createRouteBuilder() {
        return new RouteBuilder() { // from class: org.apache.camel.processor.throttle.concurrent.ConcurrentRequestsThrottlerTest.3
            public void configure() {
                onException(ThrottlerRejectedExecutionException.class).handled(true).to("mock:error");
                from("direct:a").throttle(2L).concurrentRequestsMode().process(exchange -> {
                    Assertions.assertTrue(ConcurrentRequestsThrottlerTest.semaphore.tryAcquire(), "'direct:a' too many requests");
                }).delay(100L).process(exchange2 -> {
                    ConcurrentRequestsThrottlerTest.semaphore.release();
                }).to(new String[]{"log:result", "mock:result"});
                from("direct:expressionConstant").throttle(constant(Integer.valueOf(ConcurrentRequestsThrottlerTest.CONCURRENT_REQUESTS))).concurrentRequestsMode().process(exchange3 -> {
                    Assertions.assertTrue(ConcurrentRequestsThrottlerTest.semaphore.tryAcquire(), "'direct:expressionConstant' too many requests");
                }).delay(100L).process(exchange4 -> {
                    ConcurrentRequestsThrottlerTest.semaphore.release();
                }).to(new String[]{"log:result", "mock:result"});
                from("direct:expressionHeader").throttle(header("throttleValue")).concurrentRequestsMode().process(exchange5 -> {
                    Assertions.assertTrue(ConcurrentRequestsThrottlerTest.semaphore.tryAcquire(), "'direct:expressionHeader' too many requests");
                }).delay(100L).process(exchange6 -> {
                    ConcurrentRequestsThrottlerTest.semaphore.release();
                }).to(new String[]{"log:result", "mock:result"});
                from("direct:start").throttle(2L).concurrentRequestsMode().rejectExecution(true).delay(1000L).to(new String[]{"log:result", "mock:result"});
                from("direct:fifo").throttle(1L).concurrentRequestsMode().delay(100L).to("mock:result");
                from("direct:release").errorHandler(deadLetterChannel("mock:error")).throttle(1L).delay(100L).process(exchange7 -> {
                    throw new RuntimeException();
                }).to("mock:result");
            }
        };
    }
}
