/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.common.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEvent;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.common.runtime.EventAccumulator;
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@Timeout(value=60L)
public class MultiThreadedEventProcessorTest {
    @Test
    public void testCreateAndClose() throws Exception {
        MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 2, Time.SYSTEM, (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class));
        eventProcessor.close();
    }

    @Test
    public void testEventsAreProcessed() throws Exception {
        try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 2, Time.SYSTEM, (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class));){
            AtomicInteger numEventsExecuted = new AtomicInteger(0);
            FutureEvent[] futureEventArray = new FutureEvent[6];
            futureEventArray[0] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[1] = new FutureEvent<Integer>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet);
            futureEventArray[2] = new FutureEvent<Integer>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet);
            futureEventArray[3] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[4] = new FutureEvent<Integer>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet);
            futureEventArray[5] = new FutureEvent<Integer>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet);
            List<FutureEvent> events = Arrays.asList(futureEventArray);
            events.forEach(arg_0 -> ((CoordinatorEventProcessor)eventProcessor).enqueueLast(arg_0));
            CompletableFuture.allOf((CompletableFuture[])events.stream().map(FutureEvent::future).toArray(CompletableFuture[]::new)).get(10L, TimeUnit.SECONDS);
            events.forEach(event -> {
                Assertions.assertTrue((boolean)event.future.isDone());
                Assertions.assertFalse((boolean)event.future.isCompletedExceptionally());
            });
            Assertions.assertEquals((int)events.size(), (int)numEventsExecuted.get());
        }
    }

    @Test
    public void testProcessingGuarantees() throws Exception {
        try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 2, Time.SYSTEM, (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class));){
            AtomicInteger numEventsExecuted = new AtomicInteger(0);
            FutureEvent[] futureEventArray = new FutureEvent[6];
            futureEventArray[0] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet, true);
            futureEventArray[1] = new FutureEvent<Integer>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet, true);
            futureEventArray[2] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet, true);
            futureEventArray[3] = new FutureEvent<Integer>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet, true);
            futureEventArray[4] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet, true);
            futureEventArray[5] = new FutureEvent<Integer>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet, true);
            List<FutureEvent> events = Arrays.asList(futureEventArray);
            events.forEach(arg_0 -> ((CoordinatorEventProcessor)eventProcessor).enqueueLast(arg_0));
            Assertions.assertTrue((boolean)events.get(0).awaitExecution(5L, TimeUnit.SECONDS));
            Assertions.assertTrue((boolean)events.get(1).awaitExecution(5L, TimeUnit.SECONDS));
            events.get(0).release();
            int result = (Integer)events.get((int)0).future.get(5L, TimeUnit.SECONDS);
            Assertions.assertTrue((result == 1 || result == 2 ? 1 : 0) != 0, (String)("Expected 1 or 2 but was " + result));
            Assertions.assertTrue((boolean)events.get(2).awaitExecution(5L, TimeUnit.SECONDS));
            events.get(2).release();
            Assertions.assertEquals((int)3, (Integer)((Integer)events.get((int)2).future.get(5L, TimeUnit.SECONDS)));
            Assertions.assertTrue((boolean)events.get(4).awaitExecution(5L, TimeUnit.SECONDS));
            events.get(1).release();
            result = (Integer)events.get((int)1).future.get(5L, TimeUnit.SECONDS);
            Assertions.assertTrue((result == 1 || result == 2 ? 1 : 0) != 0, (String)("Expected 1 or 2 but was " + result));
            Assertions.assertTrue((boolean)events.get(3).awaitExecution(5L, TimeUnit.SECONDS));
            events.get(4).release();
            Assertions.assertEquals((int)4, (Integer)((Integer)events.get((int)4).future.get(5L, TimeUnit.SECONDS)));
            events.get(3).release();
            Assertions.assertEquals((int)5, (Integer)((Integer)events.get((int)3).future.get(5L, TimeUnit.SECONDS)));
            Assertions.assertTrue((boolean)events.get(5).awaitExecution(5L, TimeUnit.SECONDS));
            events.get(5).release();
            Assertions.assertEquals((int)6, (Integer)((Integer)events.get((int)5).future.get(5L, TimeUnit.SECONDS)));
            events.forEach(event -> {
                Assertions.assertTrue((boolean)event.future.isDone());
                Assertions.assertFalse((boolean)event.future.isCompletedExceptionally());
            });
            Assertions.assertEquals((int)events.size(), (int)numEventsExecuted.get());
        }
    }

    @Test
    public void testEventsAreRejectedWhenClosed() throws Exception {
        MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 2, Time.SYSTEM, (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class));
        eventProcessor.close();
        Assertions.assertThrows(RejectedExecutionException.class, () -> MultiThreadedEventProcessorTest.lambda$testEventsAreRejectedWhenClosed$4((CoordinatorEventProcessor)eventProcessor));
    }

    @Test
    public void testEventsAreDrainedWhenClosed() throws Exception {
        try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 1, Time.SYSTEM, (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class));){
            AtomicInteger numEventsExecuted = new AtomicInteger(0);
            FutureEvent<Integer> blockingEvent = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet, true);
            FutureEvent[] futureEventArray = new FutureEvent[6];
            futureEventArray[0] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[1] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[2] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[3] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[4] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[5] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            List<FutureEvent> events = Arrays.asList(futureEventArray);
            eventProcessor.enqueueLast(blockingEvent);
            TestUtils.waitForCondition(() -> numEventsExecuted.get() > 0, (String)"Blocking event not executed.");
            events.forEach(arg_0 -> ((MultiThreadedEventProcessor)eventProcessor).enqueueLast(arg_0));
            events.forEach(event -> Assertions.assertFalse((boolean)event.future.isDone()));
            eventProcessor.beginShutdown();
            Assertions.assertThrows(RejectedExecutionException.class, () -> eventProcessor.enqueueLast((CoordinatorEvent)blockingEvent));
            blockingEvent.release();
            blockingEvent.future.get(15000L, TimeUnit.SECONDS);
            Assertions.assertTrue((boolean)blockingEvent.future.isDone());
            Assertions.assertFalse((boolean)blockingEvent.future.isCompletedExceptionally());
            events.forEach(event -> {
                Throwable t = Assertions.assertThrows(ExecutionException.class, () -> event.future.get(15000L, TimeUnit.SECONDS));
                Assertions.assertEquals(RejectedExecutionException.class, t.getCause().getClass());
            });
            Assertions.assertEquals((int)1, (int)numEventsExecuted.get());
        }
    }

    @Test
    public void testMetrics() throws Exception {
        CoordinatorRuntimeMetrics mockRuntimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        MockTime mockTime = new MockTime();
        AtomicInteger numEventsExecuted = new AtomicInteger(0);
        FutureEvent<Integer> blockingEvent = new FutureEvent<Integer>(new TopicPartition("foo", 0), () -> MultiThreadedEventProcessorTest.lambda$testMetrics$10((Time)mockTime, numEventsExecuted), true, mockTime.milliseconds());
        try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 1, (Time)mockTime, mockRuntimeMetrics, (EventAccumulator)new DelayEventAccumulator((Time)mockTime, 500L));){
            eventProcessor.enqueueLast(blockingEvent);
            TestUtils.waitForCondition(() -> numEventsExecuted.get() > 0, (String)"Blocking event not executed.");
            FutureEvent<Integer> otherEvent = new FutureEvent<Integer>(new TopicPartition("foo", 0), () -> MultiThreadedEventProcessorTest.lambda$testMetrics$12((Time)mockTime, numEventsExecuted), false, mockTime.milliseconds());
            eventProcessor.enqueueLast(otherEvent);
            mockTime.sleep(3000L);
            Assertions.assertFalse((boolean)otherEvent.future.isDone());
            blockingEvent.release();
            blockingEvent.future.get(15000L, TimeUnit.SECONDS);
            Assertions.assertTrue((boolean)blockingEvent.future.isDone());
            Assertions.assertFalse((boolean)blockingEvent.future.isCompletedExceptionally());
            otherEvent.future.get(15000L, TimeUnit.SECONDS);
            Assertions.assertTrue((boolean)otherEvent.future.isDone());
            Assertions.assertFalse((boolean)otherEvent.future.isCompletedExceptionally());
            Assertions.assertEquals((int)2, (int)numEventsExecuted.get());
            ((CoordinatorRuntimeMetrics)Mockito.verify((Object)mockRuntimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventQueueTime(500L);
            ((CoordinatorRuntimeMetrics)Mockito.verify((Object)mockRuntimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventProcessingTime(7000L);
            ((CoordinatorRuntimeMetrics)Mockito.verify((Object)mockRuntimeMetrics, (VerificationMode)Mockito.times((int)2))).recordThreadIdleTime(500.0);
            ((CoordinatorRuntimeMetrics)Mockito.verify((Object)mockRuntimeMetrics, (VerificationMode)Mockito.times((int)1))).recordEventQueueTime(3500L);
        }
    }

    @Test
    public void testRecordThreadIdleRatio() throws Exception {
        CoordinatorRuntimeMetrics mockRuntimeMetrics = (CoordinatorRuntimeMetrics)Mockito.mock(CoordinatorRuntimeMetrics.class);
        MockTime time = new MockTime();
        try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 1, (Time)time, mockRuntimeMetrics, (EventAccumulator)new DelayEventAccumulator((Time)time, 100L));){
            ArrayList recordedIdleTimesMs = new ArrayList();
            AtomicInteger numEventsExecuted = new AtomicInteger(0);
            ArgumentCaptor idleTimeCaptured = ArgumentCaptor.forClass(Double.class);
            ((CoordinatorRuntimeMetrics)Mockito.doAnswer(invocation -> {
                double threadIdleTime = (Double)idleTimeCaptured.getValue();
                Assertions.assertEquals((double)100.0, (double)threadIdleTime);
                recordedIdleTimesMs.add(threadIdleTime);
                return null;
            }).when((Object)mockRuntimeMetrics)).recordThreadIdleTime(((Double)idleTimeCaptured.capture()).doubleValue());
            FutureEvent[] futureEventArray = new FutureEvent[8];
            futureEventArray[0] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[1] = new FutureEvent<Integer>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet);
            futureEventArray[2] = new FutureEvent<Integer>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet);
            futureEventArray[3] = new FutureEvent<Integer>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet);
            futureEventArray[4] = new FutureEvent<Integer>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet);
            futureEventArray[5] = new FutureEvent<Integer>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet);
            futureEventArray[6] = new FutureEvent<Integer>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet);
            futureEventArray[7] = new FutureEvent<Integer>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet);
            List<FutureEvent> events = Arrays.asList(futureEventArray);
            long startMs = time.milliseconds();
            events.forEach(arg_0 -> ((CoordinatorEventProcessor)eventProcessor).enqueueLast(arg_0));
            CompletableFuture.allOf((CompletableFuture[])events.stream().map(FutureEvent::future).toArray(CompletableFuture[]::new)).get(10L, TimeUnit.SECONDS);
            events.forEach(event -> {
                Assertions.assertTrue((boolean)event.future.isDone());
                Assertions.assertFalse((boolean)event.future.isCompletedExceptionally());
            });
            Assertions.assertEquals((int)events.size(), (int)numEventsExecuted.get());
            ((CoordinatorRuntimeMetrics)Mockito.verify((Object)mockRuntimeMetrics, (VerificationMode)Mockito.times((int)8))).recordThreadIdleTime(ArgumentMatchers.anyDouble());
            Assertions.assertEquals((int)8, (int)recordedIdleTimesMs.size());
            long diff = time.milliseconds() - startMs;
            double sum = recordedIdleTimesMs.stream().mapToDouble(Double::doubleValue).sum();
            double idleRatio = sum / (double)diff;
            Assertions.assertEquals((double)1.0, (double)idleRatio, (String)("idle ratio should be 1.0 but was: " + idleRatio));
        }
    }

    private static /* synthetic */ Integer lambda$testMetrics$12(Time mockTime, AtomicInteger numEventsExecuted) {
        mockTime.sleep(5000L);
        return numEventsExecuted.incrementAndGet();
    }

    private static /* synthetic */ Integer lambda$testMetrics$10(Time mockTime, AtomicInteger numEventsExecuted) {
        mockTime.sleep(4000L);
        return numEventsExecuted.incrementAndGet();
    }

    private static /* synthetic */ void lambda$testEventsAreRejectedWhenClosed$4(CoordinatorEventProcessor eventProcessor) throws Throwable {
        eventProcessor.enqueueLast(new FutureEvent<Integer>(new TopicPartition("foo", 0), () -> 0));
    }

    private static class FutureEvent<T>
    implements CoordinatorEvent {
        private final TopicPartition key;
        private final CompletableFuture<T> future;
        private final Supplier<T> supplier;
        private final boolean block;
        private final CountDownLatch latch;
        private final CountDownLatch executed;
        private final long createdTimeMs;

        FutureEvent(TopicPartition key, Supplier<T> supplier) {
            this(key, supplier, false, 0L);
        }

        FutureEvent(TopicPartition key, Supplier<T> supplier, boolean block) {
            this(key, supplier, block, 0L);
        }

        FutureEvent(TopicPartition key, Supplier<T> supplier, boolean block, long createdTimeMs) {
            this.key = key;
            this.future = new CompletableFuture();
            this.supplier = supplier;
            this.block = block;
            this.latch = new CountDownLatch(1);
            this.executed = new CountDownLatch(1);
            this.createdTimeMs = createdTimeMs;
        }

        public void run() {
            T result = this.supplier.get();
            this.executed.countDown();
            if (this.block) {
                try {
                    this.latch.await();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            this.future.complete(result);
        }

        public void complete(Throwable ex) {
            this.future.completeExceptionally(ex);
        }

        public long createdTimeMs() {
            return this.createdTimeMs;
        }

        public TopicPartition key() {
            return this.key;
        }

        public CompletableFuture<T> future() {
            return this.future;
        }

        public void release() {
            this.latch.countDown();
        }

        public boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException {
            return this.executed.await(timeout, unit);
        }

        public String toString() {
            return "FutureEvent(key=" + String.valueOf(this.key) + ")";
        }
    }

    private static class DelayEventAccumulator
    extends EventAccumulator<TopicPartition, CoordinatorEvent> {
        private final Time time;
        private final long takeDelayMs;

        public DelayEventAccumulator(Time time, long takeDelayMs) {
            this.time = time;
            this.takeDelayMs = takeDelayMs;
        }

        public CoordinatorEvent poll(long timeout, TimeUnit unit) {
            CoordinatorEvent event = (CoordinatorEvent)super.poll(timeout, unit);
            this.time.sleep(this.takeDelayMs);
            return event;
        }
    }
}

