package io.camunda.zeebe.logstreams.impl.flowcontrol;

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.SettableLimit;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.Mockito;

@Execution(ExecutionMode.CONCURRENT)
/* loaded from: input_file:io/camunda/zeebe/logstreams/impl/flowcontrol/AppenderFlowControlTest.class */
final class AppenderFlowControlTest {
    AppenderFlowControlTest() {
    }

    @Test
    void callsErrorHandlerOnWriteError() {
        AppendErrorHandler appendErrorHandler = (AppendErrorHandler) Mockito.mock(AppendErrorHandler.class);
        AppenderFlowControl appenderFlowControl = new AppenderFlowControl(appendErrorHandler, new AppenderMetrics(new SimpleMeterRegistry()));
        RuntimeException runtimeException = new RuntimeException();
        ((InFlightAppend) appenderFlowControl.tryAcquire().orElseThrow()).onWriteError(runtimeException);
        ((AppendErrorHandler) Mockito.verify(appendErrorHandler)).onWriteError(runtimeException);
    }

    @Test
    void callsErrorHandlerOnCommitError() {
        AppendErrorHandler appendErrorHandler = (AppendErrorHandler) Mockito.mock(AppendErrorHandler.class);
        AppenderFlowControl appenderFlowControl = new AppenderFlowControl(appendErrorHandler, new AppenderMetrics(new SimpleMeterRegistry()));
        RuntimeException runtimeException = new RuntimeException();
        ((InFlightAppend) appenderFlowControl.tryAcquire().orElseThrow()).onCommitError(0L, runtimeException);
        ((AppendErrorHandler) Mockito.verify(appendErrorHandler)).onCommitError(runtimeException);
    }

    @Test
    void eventuallyRejects() {
        AppenderFlowControl appenderFlowControl = new AppenderFlowControl((AppendErrorHandler) Mockito.mock(AppendErrorHandler.class), new AppenderMetrics(new SimpleMeterRegistry()));
        Awaitility.await("Rejects new appends").pollInSameThread().pollInterval(Duration.ZERO).until(() -> {
            return Boolean.valueOf(appenderFlowControl.tryAcquire().isEmpty());
        });
    }

    @Test
    void recoversWhenCompletingAppends() {
        AppenderFlowControl appenderFlowControl = new AppenderFlowControl((AppendErrorHandler) Mockito.mock(AppendErrorHandler.class), new AppenderMetrics(new SimpleMeterRegistry()));
        boolean z = false;
        LinkedList linkedList = new LinkedList();
        do {
            Optional tryAcquire = appenderFlowControl.tryAcquire();
            if (tryAcquire.isEmpty()) {
                z = true;
            } else {
                linkedList.push((InFlightAppend) tryAcquire.get());
            }
        } while (!z);
        linkedList.forEach(inFlightAppend -> {
            inFlightAppend.onCommit(1L);
        });
        Awaitility.await("Eventually accepts appends again").until(() -> {
            return Boolean.valueOf(appenderFlowControl.tryAcquire().isPresent());
        });
    }

    @Test
    void shouldNotAllowInFlightHigherThanLimit() throws InterruptedException {
        SettableLimit settableLimit = new SettableLimit(100);
        AppenderMetrics appenderMetrics = new AppenderMetrics(new SimpleMeterRegistry());
        appenderMetrics.setInflightLimit(100L);
        AppendLimiter build = AppendLimiter.builder().limit(settableLimit).metrics(appenderMetrics).build();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(300);
        Optional[] optionalArr = new Optional[3000];
        try {
            newFixedThreadPool.invokeAll((Collection) IntStream.range(0, 3000).mapToObj(i -> {
                return () -> {
                    int nextInt = ThreadLocalRandom.current().nextInt(100);
                    optionalArr[i] = build.acquire((Void) null);
                    Thread.sleep(nextInt);
                    Assertions.assertThat(build.getInflight()).isLessThanOrEqualTo(100);
                    ((Limiter.Listener) optionalArr[i].get()).onSuccess();
                    return null;
                };
            }).collect(Collectors.toList()));
            newFixedThreadPool.shutdown();
            Assertions.assertThat(build.getInflight()).isEqualTo(0);
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            throw th;
        }
    }
}
