package io.camunda.zeebe.broker.transport.backpressure;

import com.netflix.concurrency.limits.limit.SettableLimit;
import io.camunda.zeebe.protocol.record.intent.CommandDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:io/camunda/zeebe/broker/transport/backpressure/CommandRateLimiterTest.class */
class CommandRateLimiterTest {
    private static final int INITIAL_LIMIT = 5;
    private final SettableLimit limit = new SettableLimit(5);
    private final CommandRateLimiter rateLimiter = CommandRateLimiter.builder().limit(this.limit).build(0);
    private final Intent context = ProcessInstanceCreationIntent.CREATE;

    CommandRateLimiterTest() {
    }

    @Test
    void shouldAcquire() {
        Assertions.assertThat(this.rateLimiter.tryAcquire(0, 1L, this.context)).isTrue();
    }

    @Test
    void shouldNotAcquireAfterLimit() {
        IntStream.range(0, this.limit.getLimit()).forEach(i -> {
            Assertions.assertThat(this.rateLimiter.tryAcquire(0, 1L, this.context)).isTrue();
        });
        Assertions.assertThat(this.rateLimiter.tryAcquire(0, 1L, this.context)).isFalse();
    }

    @Test
    void shouldCompleteRequestOnResponse() {
        IntStream.range(0, this.limit.getLimit()).forEach(i -> {
            this.rateLimiter.tryAcquire(0, i, this.context);
        });
        Assertions.assertThat(this.rateLimiter.tryAcquire(0, 100L, this.context)).isFalse();
        this.rateLimiter.onResponse(0, 0L);
        Assertions.assertThat(this.rateLimiter.tryAcquire(0, 100L, this.context)).isTrue();
    }

    @Test
    void shouldCompleteAllRequests() {
        IntStream.range(0, this.limit.getLimit()).forEach(i -> {
            this.rateLimiter.tryAcquire(0, i, this.context);
        });
        Assertions.assertThat(this.rateLimiter.tryAcquire(0, 100L, this.context)).isFalse();
        IntStream.range(0, this.limit.getLimit()).forEach(i2 -> {
            this.rateLimiter.onResponse(0, i2);
        });
        IntStream.range(0, this.limit.getLimit()).forEach(i3 -> {
            Assertions.assertThat(this.rateLimiter.tryAcquire(0, i3, this.context)).isTrue();
        });
        Assertions.assertThat(this.rateLimiter.tryAcquire(0, 100L, this.context)).isFalse();
    }

    @Test
    void shouldReleaseRequestOnIgnore() {
        this.rateLimiter.tryAcquire(0, 1L, this.context);
        Assertions.assertThat(this.rateLimiter.getInflightCount()).isEqualTo(1);
        this.rateLimiter.onIgnore(0, 1L);
        Assertions.assertThat(this.rateLimiter.getInflightCount()).isEqualTo(0);
    }

    @MethodSource({"provideWhitelistedIntents"})
    @ParameterizedTest
    void shouldWhiteListedCommandAfterLimit(Intent intent) {
        IntStream.range(0, this.limit.getLimit()).forEach(i -> {
            Assertions.assertThat(this.rateLimiter.tryAcquire(0, 1L, this.context)).isTrue();
        });
        Assertions.assertThat(this.rateLimiter.tryAcquire(0, 1L, this.context)).isFalse();
        Assertions.assertThat(this.rateLimiter.tryAcquire(0, 1L, intent)).isTrue();
    }

    private static Stream<Arguments> provideWhitelistedIntents() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{JobIntent.COMPLETE}), Arguments.of(new Object[]{JobIntent.FAIL}), Arguments.of(new Object[]{ProcessInstanceIntent.CANCEL}), Arguments.of(new Object[]{DeploymentIntent.CREATE}), Arguments.of(new Object[]{DeploymentIntent.DISTRIBUTE}), Arguments.of(new Object[]{DeploymentDistributionIntent.COMPLETE}), Arguments.of(new Object[]{CommandDistributionIntent.ACKNOWLEDGE})});
    }

    @Test
    void shouldNotAllowInFlightHigherThanLimitWithNormalCommands() throws InterruptedException {
        CommandRateLimiter build = CommandRateLimiter.builder().limit(new SettableLimit(100)).build(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(300);
        try {
            newFixedThreadPool.invokeAll((Collection) IntStream.range(0, 3000).mapToObj(i -> {
                return () -> {
                    int nextInt = ThreadLocalRandom.current().nextInt(100);
                    build.tryAcquire(0, i, this.context);
                    Thread.sleep(nextInt);
                    Assertions.assertThat(build.getInflightCount()).isLessThanOrEqualTo(100);
                    build.onResponse(0, i);
                    return null;
                };
            }).collect(Collectors.toList()));
            newFixedThreadPool.shutdown();
            Assertions.assertThat(build.getInflightCount()).isEqualTo(0);
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            throw th;
        }
    }

    @Test
    void shouldAllowInFlightHigherThanLimitWithWhitelistedCommands() throws InterruptedException {
        CommandRateLimiter build = CommandRateLimiter.builder().limit(new SettableLimit(100)).build(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(300);
        JobIntent jobIntent = JobIntent.COMPLETE;
        try {
            newFixedThreadPool.invokeAll((Collection) IntStream.range(0, 3000).mapToObj(i -> {
                return () -> {
                    int nextInt = ThreadLocalRandom.current().nextInt(100);
                    build.tryAcquire(0, i, i % 2 == 0 ? this.context : jobIntent);
                    Thread.sleep(nextInt);
                    Assertions.assertThat(build.getInflightCount()).isLessThanOrEqualTo(200);
                    build.onResponse(0, i);
                    return null;
                };
            }).collect(Collectors.toList()));
            newFixedThreadPool.shutdown();
            Assertions.assertThat(build.getInflightCount()).isEqualTo(0);
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            throw th;
        }
    }
}
