package org.apache.flink.connector.base.sink.writer;

import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/connector/base/sink/writer/AIMDRateLimitingStrategyTest.class */
public class AIMDRateLimitingStrategyTest {
    private static final int INITIAL_RATE = 32;
    private static final int INCREASE_RATE = 10;
    private static final int THRESHOLD = 100;
    private static final double DECREASE_RATE = 0.5d;

    @Test
    public void testInitialRateIsSetByConstructor() {
        Assertions.assertThat(new AIMDRateLimitingStrategy(10, DECREASE_RATE, THRESHOLD, INITIAL_RATE).getRateLimit()).isEqualTo(INITIAL_RATE);
    }

    @Test
    public void testAcknowledgedRequestsIncreaseLinearly() {
        AIMDRateLimitingStrategy aIMDRateLimitingStrategy = new AIMDRateLimitingStrategy(10, DECREASE_RATE, THRESHOLD, INITIAL_RATE);
        updateStrategyWithAcks(aIMDRateLimitingStrategy, 5);
        Assertions.assertThat(aIMDRateLimitingStrategy.getRateLimit()).isEqualTo(INITIAL_RATE + (5 * 10));
    }

    @Test
    public void testFailedRequestsDecreaseExponentially() {
        AIMDRateLimitingStrategy aIMDRateLimitingStrategy = new AIMDRateLimitingStrategy(10, DECREASE_RATE, THRESHOLD, INITIAL_RATE);
        int rateLimit = aIMDRateLimitingStrategy.getRateLimit();
        for (int i = 1; i < 5; i++) {
            aIMDRateLimitingStrategy.scaleDown();
            Assertions.assertThat(aIMDRateLimitingStrategy.getRateLimit()).isEqualTo(decreaseRateWithFactor(rateLimit, DECREASE_RATE));
            rateLimit = decreaseRateWithFactor(rateLimit, DECREASE_RATE);
        }
    }

    @Test
    public void testIncreaseRateNeverExceedsThreshold() {
        AIMDRateLimitingStrategy aIMDRateLimitingStrategy = new AIMDRateLimitingStrategy(10, DECREASE_RATE, THRESHOLD, INITIAL_RATE);
        updateStrategyWithAcks(aIMDRateLimitingStrategy, 11);
        Assertions.assertThat(aIMDRateLimitingStrategy.getRateLimit()).isEqualTo(THRESHOLD);
    }

    @Test
    public void testFailureOnInitialBiggerThanThreshold() {
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
            new AIMDRateLimitingStrategy(10, DECREASE_RATE, 10, 11);
        }).withMessageContaining("Initial rate must not exceed threshold");
    }

    @Test
    public void testFailureOnInvalidDecreaseFactor() {
        double d = 1.5d;
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
            new AIMDRateLimitingStrategy(10, d, THRESHOLD, 11);
        }).withMessageContaining("Decrease factor must be between 0.0 and 1.0");
    }

    @Test
    public void testFailureOnInvalidIncreaseRate() {
        int i = -15;
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
            new AIMDRateLimitingStrategy(i, DECREASE_RATE, THRESHOLD, 11);
        }).withMessageContaining("Increase rate must be positive integer.");
    }

    private void updateStrategyWithAcks(AIMDRateLimitingStrategy aIMDRateLimitingStrategy, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            aIMDRateLimitingStrategy.scaleUp();
        }
    }

    private int decreaseRateWithFactor(int i, double d) {
        return Math.max((int) Math.round(i * d), 1);
    }
}
