/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.throttle.concurrent;

import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Expression;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.ExpressionNode;
import org.apache.camel.processor.ThrottlerRejectedExecutionException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;

@DisabledIfSystemProperty(named="ci.env.name", matches="github.com", disabledReason="Flaky on Github CI")
@EnabledOnOs(value={OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD}, architectures={"amd64", "aarch64", "ppc64le"}, disabledReason="This test does not run reliably multiple platforms (see CAMEL-21438)")
public class ConcurrentRequestsThrottlerTest
extends ContextTestSupport {
    private static final int INTERVAL = 500;
    private static final int MESSAGE_COUNT = 9;
    private static final int CONCURRENT_REQUESTS = 2;
    protected static Semaphore semaphore;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSendLotsOfMessagesWithRejectExecution() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        resultEndpoint.expectedMessageCount(2);
        MockEndpoint errorEndpoint = this.resolveMandatoryEndpoint("mock:error", MockEndpoint.class);
        errorEndpoint.expectedMessageCount(4);
        ExecutorService executor = Executors.newFixedThreadPool(6);
        try {
            for (int i = 0; i < 6; ++i) {
                executor.execute(() -> this.template.sendBody("direct:start", (Object)"<message>payload</message>"));
            }
            this.assertMockEndpointsSatisfied();
        }
        finally {
            this.shutdownAndAwait(executor);
        }
    }

    @Test
    public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
        semaphore = new Semaphore(2);
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        this.sendMessagesAndAwaitDelivery(9, "direct:a", 9, resultEndpoint);
    }

    @Test
    public void testConfigurationWithConstantExpression() throws Exception {
        semaphore = new Semaphore(2);
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        this.sendMessagesAndAwaitDelivery(9, "direct:expressionConstant", 9, resultEndpoint);
    }

    @Test
    public void testConfigurationWithHeaderExpression() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        resultEndpoint.expectedMessageCount(9);
        ExecutorService executor = Executors.newFixedThreadPool(9);
        try {
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 9);
        }
        finally {
            this.shutdownAndAwait(executor);
        }
    }

    @Test
    public void testConfigurationWithChangingHeaderExpression() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        try {
            MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 9);
            Thread.sleep(500L);
            resultEndpoint.reset();
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, 9);
            Thread.sleep(500L);
            resultEndpoint.reset();
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 9);
            Thread.sleep(500L);
            resultEndpoint.reset();
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, 9);
        }
        finally {
            this.shutdownAndAwait(executor);
        }
    }

    @Test
    public void testFifo() throws Exception {
        this.getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"A", "B", "C", "D", "E", "F", "G", "H"});
        this.sendBody("direct:fifo");
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testPermitReleaseOnException() throws Exception {
        this.getMockEndpoint("mock:error").expectedBodiesReceived(new Object[]{"A", "B", "C", "D", "E", "F", "G", "H"});
        this.sendBody("direct:release");
        this.assertMockEndpointsSatisfied();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendMessagesAndAwaitDelivery(int messageCount, final String endpointUri, int threadPoolSize, MockEndpoint receivingEndpoint) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
        try {
            if (receivingEndpoint != null) {
                receivingEndpoint.expectedMessageCount(messageCount);
            }
            for (int i = 0; i < messageCount; ++i) {
                executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        ConcurrentRequestsThrottlerTest.this.template.sendBody(endpointUri, (Object)"<message>payload</message>");
                    }
                });
            }
            if (receivingEndpoint != null) {
                receivingEndpoint.assertIsSatisfied();
            }
        }
        finally {
            this.shutdownAndAwait(executor);
        }
    }

    private void sendMessagesWithHeaderExpression(ExecutorService executor, MockEndpoint resultEndpoint, final int throttle, int messageCount) throws InterruptedException {
        resultEndpoint.expectedMessageCount(messageCount);
        semaphore = new Semaphore(throttle);
        for (int i = 0; i < messageCount; ++i) {
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    ConcurrentRequestsThrottlerTest.this.template.sendBodyAndHeader("direct:expressionHeader", (Object)"<message>payload</message>", "throttleValue", (Object)throttle);
                }
            });
        }
        resultEndpoint.assertIsSatisfied();
    }

    private void sendBody(String endpoint) {
        Arrays.stream(new String[]{"A", "B", "C", "D", "E", "F", "G", "H"}).forEach(b -> this.template.sendBody(endpoint, b));
    }

    private void shutdownAndAwait(ExecutorService executorService) {
        executorService.shutdown();
        try {
            Assertions.assertTrue((boolean)executorService.awaitTermination(10L, TimeUnit.SECONDS), (String)"Test ExecutorService shutdown is not expected to take longer than 10 seconds.");
        }
        catch (InterruptedException e) {
            Assertions.fail((String)"Test ExecutorService shutdown is not expected to be interrupted.");
        }
    }

    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder(){

            public void configure() {
                this.onException(ThrottlerRejectedExecutionException.class).handled(true).to("mock:error");
                ((ExpressionNode)((ExpressionNode)this.from("direct:a").throttle(2L).concurrentRequestsMode().process(exchange -> Assertions.assertTrue((boolean)semaphore.tryAcquire(), (String)"'direct:a' too many requests"))).delay(100L).process(exchange -> semaphore.release())).to(new String[]{"log:result", "mock:result"});
                ((ExpressionNode)((ExpressionNode)this.from("direct:expressionConstant").throttle((Expression)this.constant(2)).concurrentRequestsMode().process(exchange -> Assertions.assertTrue((boolean)semaphore.tryAcquire(), (String)"'direct:expressionConstant' too many requests"))).delay(100L).process(exchange -> semaphore.release())).to(new String[]{"log:result", "mock:result"});
                ((ExpressionNode)((ExpressionNode)this.from("direct:expressionHeader").throttle((Expression)this.header("throttleValue")).concurrentRequestsMode().process(exchange -> Assertions.assertTrue((boolean)semaphore.tryAcquire(), (String)"'direct:expressionHeader' too many requests"))).delay(100L).process(exchange -> semaphore.release())).to(new String[]{"log:result", "mock:result"});
                this.from("direct:start").throttle(2L).concurrentRequestsMode().rejectExecution(true).delay(1000L).to(new String[]{"log:result", "mock:result"});
                this.from("direct:fifo").throttle(1L).concurrentRequestsMode().delay(100L).to("mock:result");
                ((ExpressionNode)this.from("direct:release").errorHandler((ErrorHandlerFactory)this.deadLetterChannel("mock:error")).throttle(1L).delay(100L).process(exchange -> {
                    throw new RuntimeException();
                })).to("mock:result");
            }
        };
    }
}

