package org.apache.kafka.coordinator.group.runtime;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(60)
/* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.class */
public class MultiThreadedEventProcessorTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest$FutureEvent.class */
    public static class FutureEvent<T> implements CoordinatorEvent {
        private final TopicPartition key;
        private final CompletableFuture<T> future;
        private final Supplier<T> supplier;
        private final boolean block;
        private final CountDownLatch latch;
        private final CountDownLatch executed;

        FutureEvent(TopicPartition topicPartition, Supplier<T> supplier) {
            this(topicPartition, supplier, false);
        }

        FutureEvent(TopicPartition topicPartition, Supplier<T> supplier, boolean z) {
            this.key = topicPartition;
            this.future = new CompletableFuture<>();
            this.supplier = supplier;
            this.block = z;
            this.latch = new CountDownLatch(1);
            this.executed = new CountDownLatch(1);
        }

        public void run() {
            T t = this.supplier.get();
            this.executed.countDown();
            if (this.block) {
                try {
                    this.latch.await();
                } catch (InterruptedException e) {
                }
            }
            this.future.complete(t);
        }

        public void complete(Throwable th) {
            this.future.completeExceptionally(th);
        }

        /* renamed from: key, reason: merged with bridge method [inline-methods] */
        public TopicPartition m47key() {
            return this.key;
        }

        public CompletableFuture<T> future() {
            return this.future;
        }

        public void release() {
            this.latch.countDown();
        }

        public boolean awaitExecution(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.executed.await(j, timeUnit);
        }

        public String toString() {
            return "FutureEvent(key=" + this.key + ")";
        }
    }

    @Test
    public void testCreateAndClose() throws Exception {
        new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 2).close();
    }

    @Test
    public void testEventsAreProcessed() throws Exception {
        MultiThreadedEventProcessor multiThreadedEventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 2);
        Throwable th = null;
        try {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            TopicPartition topicPartition = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition2 = new TopicPartition("foo", 1);
            atomicInteger.getClass();
            TopicPartition topicPartition3 = new TopicPartition("foo", 2);
            atomicInteger.getClass();
            TopicPartition topicPartition4 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition5 = new TopicPartition("foo", 1);
            atomicInteger.getClass();
            TopicPartition topicPartition6 = new TopicPartition("foo", 2);
            atomicInteger.getClass();
            List asList = Arrays.asList(new FutureEvent(topicPartition, atomicInteger::incrementAndGet), new FutureEvent(topicPartition2, atomicInteger::incrementAndGet), new FutureEvent(topicPartition3, atomicInteger::incrementAndGet), new FutureEvent(topicPartition4, atomicInteger::incrementAndGet), new FutureEvent(topicPartition5, atomicInteger::incrementAndGet), new FutureEvent(topicPartition6, atomicInteger::incrementAndGet));
            multiThreadedEventProcessor.getClass();
            asList.forEach((v1) -> {
                r1.enqueue(v1);
            });
            CompletableFuture.allOf((CompletableFuture[]) asList.stream().map((v0) -> {
                return v0.future();
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).get(10L, TimeUnit.SECONDS);
            asList.forEach(futureEvent -> {
                Assertions.assertTrue(futureEvent.future.isDone());
                Assertions.assertFalse(futureEvent.future.isCompletedExceptionally());
            });
            Assertions.assertEquals(asList.size(), atomicInteger.get());
            if (multiThreadedEventProcessor != null) {
                if (0 == 0) {
                    multiThreadedEventProcessor.close();
                    return;
                }
                try {
                    multiThreadedEventProcessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (multiThreadedEventProcessor != null) {
                if (0 != 0) {
                    try {
                        multiThreadedEventProcessor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    multiThreadedEventProcessor.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testProcessingGuarantees() throws Exception {
        MultiThreadedEventProcessor multiThreadedEventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 2);
        Throwable th = null;
        try {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            TopicPartition topicPartition = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition2 = new TopicPartition("foo", 1);
            atomicInteger.getClass();
            TopicPartition topicPartition3 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition4 = new TopicPartition("foo", 1);
            atomicInteger.getClass();
            TopicPartition topicPartition5 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition6 = new TopicPartition("foo", 1);
            atomicInteger.getClass();
            List asList = Arrays.asList(new FutureEvent(topicPartition, atomicInteger::incrementAndGet, true), new FutureEvent(topicPartition2, atomicInteger::incrementAndGet, true), new FutureEvent(topicPartition3, atomicInteger::incrementAndGet, true), new FutureEvent(topicPartition4, atomicInteger::incrementAndGet, true), new FutureEvent(topicPartition5, atomicInteger::incrementAndGet, true), new FutureEvent(topicPartition6, atomicInteger::incrementAndGet, true));
            multiThreadedEventProcessor.getClass();
            asList.forEach((v1) -> {
                r1.enqueue(v1);
            });
            Assertions.assertTrue(((FutureEvent) asList.get(0)).awaitExecution(5L, TimeUnit.SECONDS));
            Assertions.assertTrue(((FutureEvent) asList.get(1)).awaitExecution(5L, TimeUnit.SECONDS));
            ((FutureEvent) asList.get(0)).release();
            int intValue = ((Integer) ((FutureEvent) asList.get(0)).future.get(5L, TimeUnit.SECONDS)).intValue();
            Assertions.assertTrue(intValue == 1 || intValue == 2, "Expected 1 or 2 but was " + intValue);
            Assertions.assertTrue(((FutureEvent) asList.get(2)).awaitExecution(5L, TimeUnit.SECONDS));
            ((FutureEvent) asList.get(2)).release();
            Assertions.assertEquals(3, (Integer) ((FutureEvent) asList.get(2)).future.get(5L, TimeUnit.SECONDS));
            Assertions.assertTrue(((FutureEvent) asList.get(4)).awaitExecution(5L, TimeUnit.SECONDS));
            ((FutureEvent) asList.get(1)).release();
            int intValue2 = ((Integer) ((FutureEvent) asList.get(1)).future.get(5L, TimeUnit.SECONDS)).intValue();
            Assertions.assertTrue(intValue2 == 1 || intValue2 == 2, "Expected 1 or 2 but was " + intValue2);
            Assertions.assertTrue(((FutureEvent) asList.get(3)).awaitExecution(5L, TimeUnit.SECONDS));
            ((FutureEvent) asList.get(4)).release();
            Assertions.assertEquals(4, (Integer) ((FutureEvent) asList.get(4)).future.get(5L, TimeUnit.SECONDS));
            ((FutureEvent) asList.get(3)).release();
            Assertions.assertEquals(5, (Integer) ((FutureEvent) asList.get(3)).future.get(5L, TimeUnit.SECONDS));
            Assertions.assertTrue(((FutureEvent) asList.get(5)).awaitExecution(5L, TimeUnit.SECONDS));
            ((FutureEvent) asList.get(5)).release();
            Assertions.assertEquals(6, (Integer) ((FutureEvent) asList.get(5)).future.get(5L, TimeUnit.SECONDS));
            asList.forEach(futureEvent -> {
                Assertions.assertTrue(futureEvent.future.isDone());
                Assertions.assertFalse(futureEvent.future.isCompletedExceptionally());
            });
            Assertions.assertEquals(asList.size(), atomicInteger.get());
            if (multiThreadedEventProcessor != null) {
                if (0 == 0) {
                    multiThreadedEventProcessor.close();
                    return;
                }
                try {
                    multiThreadedEventProcessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (multiThreadedEventProcessor != null) {
                if (0 != 0) {
                    try {
                        multiThreadedEventProcessor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    multiThreadedEventProcessor.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testEventsAreRejectedWhenClosed() throws Exception {
        MultiThreadedEventProcessor multiThreadedEventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 2);
        multiThreadedEventProcessor.close();
        Assertions.assertThrows(RejectedExecutionException.class, () -> {
            multiThreadedEventProcessor.enqueue(new FutureEvent(new TopicPartition("foo", 0), () -> {
                return 0;
            }));
        });
    }

    @Test
    public void testEventsAreDrainedWhenClosed() throws Exception {
        MultiThreadedEventProcessor multiThreadedEventProcessor = new MultiThreadedEventProcessor(new LogContext(), "event-processor-", 1);
        Throwable th = null;
        try {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            TopicPartition topicPartition = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            FutureEvent futureEvent = new FutureEvent(topicPartition, atomicInteger::incrementAndGet, true);
            TopicPartition topicPartition2 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition3 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition4 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition5 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition6 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            TopicPartition topicPartition7 = new TopicPartition("foo", 0);
            atomicInteger.getClass();
            List asList = Arrays.asList(new FutureEvent(topicPartition2, atomicInteger::incrementAndGet), new FutureEvent(topicPartition3, atomicInteger::incrementAndGet), new FutureEvent(topicPartition4, atomicInteger::incrementAndGet), new FutureEvent(topicPartition5, atomicInteger::incrementAndGet), new FutureEvent(topicPartition6, atomicInteger::incrementAndGet), new FutureEvent(topicPartition7, atomicInteger::incrementAndGet));
            multiThreadedEventProcessor.enqueue(futureEvent);
            TestUtils.waitForCondition(() -> {
                return atomicInteger.get() > 0;
            }, "Blocking event not executed.");
            multiThreadedEventProcessor.getClass();
            asList.forEach((v1) -> {
                r1.enqueue(v1);
            });
            asList.forEach(futureEvent2 -> {
                Assertions.assertFalse(futureEvent2.future.isDone());
            });
            multiThreadedEventProcessor.beginShutdown();
            Assertions.assertThrows(RejectedExecutionException.class, () -> {
                multiThreadedEventProcessor.enqueue(futureEvent);
            });
            futureEvent.release();
            futureEvent.future.get(15000L, TimeUnit.SECONDS);
            Assertions.assertTrue(futureEvent.future.isDone());
            Assertions.assertFalse(futureEvent.future.isCompletedExceptionally());
            asList.forEach(futureEvent3 -> {
                Assertions.assertEquals(RejectedExecutionException.class, Assertions.assertThrows(ExecutionException.class, () -> {
                }).getCause().getClass());
            });
            Assertions.assertEquals(1, atomicInteger.get());
            if (multiThreadedEventProcessor != null) {
                if (0 == 0) {
                    multiThreadedEventProcessor.close();
                    return;
                }
                try {
                    multiThreadedEventProcessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (multiThreadedEventProcessor != null) {
                if (0 != 0) {
                    try {
                        multiThreadedEventProcessor.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    multiThreadedEventProcessor.close();
                }
            }
            throw th3;
        }
    }
}
