/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.zeebe.logstreams.impl.flowcontrol;

import com.netflix.concurrency.limits.Limit;
import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.SettableLimit;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppendErrorHandler;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppendLimiter;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppenderFlowControl;
import io.camunda.zeebe.logstreams.impl.flowcontrol.AppenderMetrics;
import io.camunda.zeebe.logstreams.impl.flowcontrol.InFlightAppend;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.Mockito;

@Execution(value=ExecutionMode.CONCURRENT)
final class AppenderFlowControlTest {
    AppenderFlowControlTest() {
    }

    @Test
    void callsErrorHandlerOnWriteError() {
        AppendErrorHandler errorHandler = (AppendErrorHandler)Mockito.mock(AppendErrorHandler.class);
        AppenderFlowControl flow = new AppenderFlowControl(errorHandler, new AppenderMetrics((MeterRegistry)new SimpleMeterRegistry()));
        RuntimeException error = new RuntimeException();
        InFlightAppend inFlight = (InFlightAppend)flow.tryAcquire().orElseThrow();
        inFlight.onWriteError((Throwable)error);
        ((AppendErrorHandler)Mockito.verify((Object)errorHandler)).onWriteError((Throwable)error);
    }

    @Test
    void callsErrorHandlerOnCommitError() {
        AppendErrorHandler errorHandler = (AppendErrorHandler)Mockito.mock(AppendErrorHandler.class);
        AppenderFlowControl flow = new AppenderFlowControl(errorHandler, new AppenderMetrics((MeterRegistry)new SimpleMeterRegistry()));
        RuntimeException error = new RuntimeException();
        InFlightAppend inFlight = (InFlightAppend)flow.tryAcquire().orElseThrow();
        inFlight.onCommitError(0L, (Throwable)error);
        ((AppendErrorHandler)Mockito.verify((Object)errorHandler)).onCommitError((Throwable)error);
    }

    @Test
    void eventuallyRejects() {
        AppendErrorHandler errorHandler = (AppendErrorHandler)Mockito.mock(AppendErrorHandler.class);
        AppenderFlowControl flow = new AppenderFlowControl(errorHandler, new AppenderMetrics((MeterRegistry)new SimpleMeterRegistry()));
        Awaitility.await((String)"Rejects new appends").pollInSameThread().pollInterval(Duration.ZERO).until(() -> flow.tryAcquire().isEmpty());
    }

    @Test
    void recoversWhenCompletingAppends() {
        AppendErrorHandler errorHandler = (AppendErrorHandler)Mockito.mock(AppendErrorHandler.class);
        AppenderFlowControl flow = new AppenderFlowControl(errorHandler, new AppenderMetrics((MeterRegistry)new SimpleMeterRegistry()));
        boolean rejecting = false;
        LinkedList<InFlightAppend> inFlight = new LinkedList<InFlightAppend>();
        do {
            Optional result;
            if ((result = flow.tryAcquire()).isEmpty()) {
                rejecting = true;
                continue;
            }
            inFlight.push((InFlightAppend)result.get());
        } while (!rejecting);
        inFlight.forEach(append -> append.onCommit(1L));
        Awaitility.await((String)"Eventually accepts appends again").until(() -> flow.tryAcquire().isPresent());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void shouldNotAllowInFlightHigherThanLimit() throws InterruptedException {
        int numThreads = 3000;
        int poolSize = 300;
        int limit = 100;
        SettableLimit myLimit = new SettableLimit(100);
        AppenderMetrics appenderMetrics = new AppenderMetrics((MeterRegistry)new SimpleMeterRegistry());
        appenderMetrics.setInflightLimit(100L);
        AppendLimiter myRateLimiter = ((AppendLimiter.AppenderLimiterBuilder)AppendLimiter.builder().limit((Limit)myLimit)).metrics(appenderMetrics).build();
        ExecutorService threadPool = Executors.newFixedThreadPool(300);
        Optional[] listeners = new Optional[3000];
        Collection tasks = IntStream.range(0, 3000).mapToObj(i -> () -> {
            int sleepTime = ThreadLocalRandom.current().nextInt(100);
            listeners[i] = myRateLimiter.acquire(null);
            Thread.sleep(sleepTime);
            Assertions.assertThat((int)myRateLimiter.getInflight()).isLessThanOrEqualTo(100);
            ((Limiter.Listener)listeners[i].get()).onSuccess();
            return null;
        }).collect(Collectors.toList());
        try {
            threadPool.invokeAll(tasks);
        }
        finally {
            threadPool.shutdown();
        }
        Assertions.assertThat((int)myRateLimiter.getInflight()).isEqualTo(0);
    }
}

