/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.throttle.requests;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Expression;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.ThrottlerRejectedExecutionException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;

@DisabledIfSystemProperty(named="ci.env.name", matches="github.com", disabledReason="Flaky on Github CI")
@EnabledOnOs(value={OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD}, architectures={"amd64", "aarch64", "ppc64le"}, disabledReason="This test does not run reliably multiple platforms (see CAMEL-21438)")
public class ThrottlerTest
extends ContextTestSupport {
    private static final int INTERVAL = 500;
    private static final int TOLERANCE = 50;
    private static final int MESSAGE_COUNT = 9;

    @Test
    public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        resultEndpoint.expectedMessageCount(3);
        resultEndpoint.setResultWaitTime(2000L);
        for (int i = 0; i < 9; ++i) {
            this.template.sendBody("seda:a", (Object)("<message>" + i + "</message>"));
        }
        resultEndpoint.assertIsSatisfied();
    }

    @Test
    public void testSendLotsOfMessagesWithRejectExecution() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        resultEndpoint.expectedMessageCount(2);
        MockEndpoint errorEndpoint = this.resolveMandatoryEndpoint("mock:error", MockEndpoint.class);
        errorEndpoint.expectedMessageCount(4);
        for (int i = 0; i < 6; ++i) {
            this.template.sendBody("direct:start", (Object)("<message>" + i + "</message>"));
        }
        this.assertMockEndpointsSatisfied();
    }

    @Test
    public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        long elapsed = this.sendMessagesAndAwaitDelivery(9, "direct:a", 9, resultEndpoint);
        this.assertThrottlerTiming(elapsed, 5, 500, 9);
    }

    @Test
    public void testConfigurationWithConstantExpression() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        long elapsed = this.sendMessagesAndAwaitDelivery(9, "direct:expressionConstant", 9, resultEndpoint);
        this.assertThrottlerTiming(elapsed, 5, 500, 9);
    }

    @Test
    public void testConfigurationWithHeaderExpression() throws Exception {
        MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
        resultEndpoint.expectedMessageCount(9);
        ExecutorService executor = Executors.newFixedThreadPool(9);
        try {
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, 500, 9);
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void testConfigurationWithChangingHeaderExpression() throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        try {
            MockEndpoint resultEndpoint = this.resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, 500, 9);
            Thread.sleep(550L);
            resultEndpoint.reset();
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 10, 500, 9);
            Thread.sleep(550L);
            resultEndpoint.reset();
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, 500, 9);
            Thread.sleep(550L);
            resultEndpoint.reset();
            this.sendMessagesWithHeaderExpression(executor, resultEndpoint, 10, 500, 9);
        }
        finally {
            executor.shutdownNow();
        }
    }

    private void assertThrottlerTiming(long elapsedTimeMs, int throttle, int intervalMs, int messageCount) {
        long minimum = this.calculateMinimum(intervalMs, throttle, messageCount) - 50L;
        long maximum = this.calculateMaximum(intervalMs, throttle, messageCount) + 50L;
        this.log.info("Sent {} exchanges in {}ms, with throttle rate of {} per {}ms. Calculated min {}ms and max {}ms", new Object[]{messageCount, elapsedTimeMs, throttle, intervalMs, minimum, maximum += 1000L});
        Assertions.assertTrue((elapsedTimeMs >= minimum ? 1 : 0) != 0, (String)("Should take at least " + minimum + "ms, was: " + elapsedTimeMs));
        Assertions.assertTrue((elapsedTimeMs <= maximum + 50L ? 1 : 0) != 0, (String)("Should take at most " + maximum + "ms, was: " + elapsedTimeMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long sendMessagesAndAwaitDelivery(int messageCount, final String endpointUri, int threadPoolSize, MockEndpoint receivingEndpoint) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
        try {
            if (receivingEndpoint != null) {
                receivingEndpoint.expectedMessageCount(messageCount);
            }
            long start = System.nanoTime();
            for (int i = 0; i < messageCount; ++i) {
                executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        ThrottlerTest.this.template.sendBody(endpointUri, (Object)"<message>payload</message>");
                    }
                });
            }
            if (receivingEndpoint != null) {
                receivingEndpoint.assertIsSatisfied();
            }
            long l = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
            return l;
        }
        finally {
            executor.shutdownNow();
        }
    }

    private void sendMessagesWithHeaderExpression(ExecutorService executor, MockEndpoint resultEndpoint, final int throttle, int intervalMs, int messageCount) throws InterruptedException {
        resultEndpoint.expectedMessageCount(messageCount);
        long start = System.nanoTime();
        for (int i = 0; i < messageCount; ++i) {
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    ThrottlerTest.this.template.sendBodyAndHeader("direct:expressionHeader", (Object)"<message>payload</message>", "throttleValue", (Object)throttle);
                }
            });
        }
        resultEndpoint.assertIsSatisfied();
        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
        this.assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount);
    }

    private long calculateMinimum(long periodMs, long throttleRate, long messageCount) {
        if (messageCount % throttleRate > 0L) {
            return (long)Math.floor((double)messageCount / (double)throttleRate) * periodMs;
        }
        return (long)(Math.floor((double)messageCount / (double)throttleRate) * (double)periodMs) - periodMs;
    }

    private long calculateMaximum(long periodMs, long throttleRate, long messageCount) {
        return (long)Math.ceil((double)messageCount / (double)throttleRate) * periodMs;
    }

    protected RouteBuilder createRouteBuilder() {
        return new RouteBuilder(){

            public void configure() {
                this.onException(ThrottlerRejectedExecutionException.class).handled(true).to("mock:error");
                this.from("seda:a").throttle(3L).timePeriodMillis(1000L).to(new String[]{"log:result", "mock:result"});
                this.from("direct:a").throttle(5L).timePeriodMillis(500L).to(new String[]{"log:result", "mock:result"});
                this.from("direct:expressionConstant").throttle((Expression)this.constant(5)).timePeriodMillis(500L).to(new String[]{"log:result", "mock:result"});
                this.from("direct:expressionHeader").throttle((Expression)this.header("throttleValue")).timePeriodMillis(500L).to(new String[]{"log:result", "mock:result"});
                this.from("direct:start").throttle(2L).timePeriodMillis(1000L).rejectExecution(true).to(new String[]{"log:result", "mock:result"});
                this.from("direct:highThrottleRate").throttle(10000L).timePeriodMillis(500L).to("mock:result");
            }
        };
    }
}

