/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.throttle.concurrent;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Expression;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.ExpressionNode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;

@Isolated
public class ThrottlingGroupingTest
extends ContextTestSupport {
    private static final int MESSAGE_COUNT = 20;
    protected static final int CONCURRENT_REQUESTS = 2;
    protected static Map<String, Semaphore> semaphores;

    @Test
    public void testGroupingWithSingleConstant() throws Exception {
        this.getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"Hello World", "Bye World"});
        this.getMockEndpoint("mock:dead").expectedBodiesReceived(new Object[]{"Kaboom"});
        this.template.sendBodyAndHeader("seda:a", (Object)"Kaboom", "max", null);
        this.template.sendBodyAndHeader("seda:a", (Object)"Hello World", "max", (Object)2);
        this.template.sendBodyAndHeader("seda:a", (Object)"Bye World", "max", (Object)2);
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testGroupingWithDynamicHeaderExpression() throws Exception {
        this.getMockEndpoint("mock:result").expectedBodiesReceived(new Object[]{"Hello World"});
        this.getMockEndpoint("mock:result2").expectedBodiesReceived(new Object[]{"Bye World"});
        this.getMockEndpoint("mock:dead").expectedBodiesReceived(new Object[]{"Kaboom", "Saloon"});
        this.getMockEndpoint("mock:resultdynamic").expectedBodiesReceived(new Object[]{"Hello Dynamic World", "Bye Dynamic World"});
        HashMap<String, String> headers = new HashMap<String, String>();
        this.template.sendBodyAndHeaders("seda:a", (Object)"Kaboom", headers);
        this.template.sendBodyAndHeaders("seda:a", (Object)"Saloon", headers);
        headers.put("max", "2");
        this.template.sendBodyAndHeaders("seda:a", (Object)"Hello World", headers);
        this.template.sendBodyAndHeaders("seda:b", (Object)"Bye World", headers);
        headers.put("max", "2");
        headers.put("key", "1");
        this.template.sendBodyAndHeaders("seda:c", (Object)"Hello Dynamic World", headers);
        headers.put("key", "2");
        this.template.sendBodyAndHeaders("seda:c", (Object)"Bye Dynamic World", headers);
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
        this.sendMessagesAndAwaitDelivery(20, "direct:ga", resultEndpoint);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendMessagesAndAwaitDelivery(int messageCount, String endpointUri, MockEndpoint receivingEndpoint) throws InterruptedException {
        semaphores = new ConcurrentHashMap<String, Semaphore>();
        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
        try {
            if (receivingEndpoint != null) {
                receivingEndpoint.expectedMessageCount(messageCount);
            }
            int i = 0;
            while (i < messageCount) {
                int finalI = i++;
                executor.execute(() -> {
                    HashMap<String, String> headers = new HashMap<String, String>();
                    if (finalI % 2 == 0) {
                        headers.put("key", "1");
                    } else {
                        headers.put("key", "2");
                    }
                    this.template.sendBodyAndHeaders(endpointUri, (Object)"<message>payload</message>", headers);
                });
            }
            if (receivingEndpoint != null) {
                receivingEndpoint.assertIsSatisfied();
            }
        }
        finally {
            this.shutdownAndAwait(executor);
        }
    }

    @Test
    public void testConfigurationWithHeaderExpression() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
        resultEndpoint.expectedMessageCount(20);
        ExecutorService executor = Executors.newFixedThreadPool(20);
        try {
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 20);
        }
        finally {
            this.shutdownAndAwait(executor);
        }
    }

    private void sendMessagesWithHeaderExpression(ExecutorService executor, MockEndpoint resultEndpoint, int throttle, int messageCount) throws InterruptedException {
        resultEndpoint.expectedMessageCount(messageCount);
        semaphores = new ConcurrentHashMap<String, Semaphore>();
        int i = 0;
        while (i < messageCount) {
            int finalI = i++;
            executor.execute(() -> {
                HashMap<String, Object> headers = new HashMap<String, Object>();
                headers.put("throttleValue", throttle);
                if (finalI % 2 == 0) {
                    headers.put("key", "1");
                } else {
                    headers.put("key", "2");
                }
                this.template.sendBodyAndHeaders("direct:gexpressionHeader", (Object)"<message>payload</message>", headers);
            });
        }
        resultEndpoint.assertIsSatisfied();
    }

    private void shutdownAndAwait(ExecutorService executorService) {
        executorService.shutdown();
        try {
            Assertions.assertTrue((boolean)executorService.awaitTermination(10L, TimeUnit.SECONDS), (String)"Test ExecutorService shutdown is not expected to take longer than 10 seconds.");
        }
        catch (InterruptedException e) {
            Assertions.fail((String)"Test ExecutorService shutdown is not expected to be interrupted.");
        }
    }

    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder(){

            public void configure() {
                this.errorHandler((ErrorHandlerFactory)this.deadLetterChannel("mock:dead"));
                this.from("seda:a").throttle((Expression)this.header("max"), 1L).concurrentRequestsMode().to("mock:result");
                this.from("seda:b").throttle((Expression)this.header("max"), 2L).concurrentRequestsMode().to("mock:result2");
                this.from("seda:c").throttle((Expression)this.header("max")).concurrentRequestsMode().correlationExpression((Expression)this.header("key")).to("mock:resultdynamic");
                ((ExpressionNode)((ExpressionNode)this.from("direct:ga").throttle((Expression)this.constant(2), (Expression)this.header("key")).concurrentRequestsMode().process(exchange -> {
                    String key = (String)exchange.getMessage().getHeader("key");
                    Assertions.assertTrue((boolean)semaphores.computeIfAbsent(key, k -> new Semaphore(2)).tryAcquire(), (String)("'direct:ga' too many requests for key " + key));
                })).delay(100L).process(exchange -> semaphores.get(exchange.getMessage().getHeader("key")).release())).to(new String[]{"log:gresult", "mock:gresult"});
                ((ExpressionNode)((ExpressionNode)this.from("direct:gexpressionHeader").throttle((Expression)this.header("throttleValue"), (Expression)this.header("key")).concurrentRequestsMode().process(exchange -> {
                    String key = (String)exchange.getMessage().getHeader("key");
                    Assertions.assertTrue((boolean)semaphores.computeIfAbsent(key, k -> new Semaphore((Integer)exchange.getMessage().getHeader("throttleValue"))).tryAcquire(), (String)("'direct:gexpressionHeader' too many requests for key " + key));
                })).delay(100L).process(exchange -> semaphores.get(exchange.getMessage().getHeader("key")).release())).to(new String[]{"log:gresult", "mock:gresult"});
            }
        };
    }
}

