package org.apache.kafka.coordinator.group.runtime;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.coordinator.group.runtime.EventAccumulator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.class */
public class EventAccumulatorTest {

    /* loaded from: input_file:org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest$MockEvent.class */
    private class MockEvent implements EventAccumulator.Event<Integer> {
        int key;
        int value;

        MockEvent(int i, int i2) {
            this.key = i;
            this.value = i2;
        }

        /* renamed from: key, reason: merged with bridge method [inline-methods] */
        public Integer m42key() {
            return Integer.valueOf(this.key);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            MockEvent mockEvent = (MockEvent) obj;
            return this.key == mockEvent.key && this.value == mockEvent.value;
        }

        public int hashCode() {
            return (31 * this.key) + this.value;
        }

        public String toString() {
            return "MockEvent(key=" + this.key + ", value=" + this.value + ')';
        }
    }

    @Test
    public void testBasicOperations() {
        EventAccumulator eventAccumulator = new EventAccumulator();
        Assertions.assertEquals(0, eventAccumulator.size());
        Assertions.assertNull(eventAccumulator.poll(0L, TimeUnit.MICROSECONDS));
        List asList = Arrays.asList(new MockEvent(1, 0), new MockEvent(1, 1), new MockEvent(1, 2), new MockEvent(2, 0), new MockEvent(2, 1), new MockEvent(2, 3), new MockEvent(3, 0), new MockEvent(3, 1), new MockEvent(3, 2));
        eventAccumulator.getClass();
        asList.forEach((v1) -> {
            r1.add(v1);
        });
        Assertions.assertEquals(9, eventAccumulator.size());
        HashSet hashSet = new HashSet();
        for (int i = 0; i < asList.size(); i++) {
            MockEvent mockEvent = (MockEvent) eventAccumulator.poll(0L, TimeUnit.MICROSECONDS);
            Assertions.assertNotNull(mockEvent);
            hashSet.add(mockEvent);
            Assertions.assertEquals((asList.size() - 1) - i, eventAccumulator.size());
            eventAccumulator.done(mockEvent);
        }
        Assertions.assertNull(eventAccumulator.poll(0L, TimeUnit.MICROSECONDS));
        Assertions.assertEquals(new HashSet(asList), hashSet);
        Assertions.assertEquals(0, eventAccumulator.size());
        eventAccumulator.close();
    }

    @Test
    public void testKeyConcurrentProcessingAndOrdering() {
        EventAccumulator eventAccumulator = new EventAccumulator();
        MockEvent mockEvent = new MockEvent(1, 0);
        MockEvent mockEvent2 = new MockEvent(1, 1);
        MockEvent mockEvent3 = new MockEvent(1, 2);
        eventAccumulator.add(mockEvent);
        eventAccumulator.add(mockEvent2);
        eventAccumulator.add(mockEvent3);
        Assertions.assertEquals(3, eventAccumulator.size());
        MockEvent mockEvent4 = (MockEvent) eventAccumulator.poll(0L, TimeUnit.MICROSECONDS);
        Assertions.assertEquals(mockEvent, mockEvent4);
        Assertions.assertNull(eventAccumulator.poll(0L, TimeUnit.MICROSECONDS));
        eventAccumulator.done(mockEvent4);
        MockEvent mockEvent5 = (MockEvent) eventAccumulator.poll(0L, TimeUnit.MICROSECONDS);
        Assertions.assertEquals(mockEvent2, mockEvent5);
        Assertions.assertNull(eventAccumulator.poll(0L, TimeUnit.MICROSECONDS));
        eventAccumulator.done(mockEvent5);
        MockEvent mockEvent6 = (MockEvent) eventAccumulator.poll(0L, TimeUnit.MICROSECONDS);
        Assertions.assertEquals(mockEvent3, mockEvent6);
        Assertions.assertNull(eventAccumulator.poll(0L, TimeUnit.MICROSECONDS));
        eventAccumulator.done(mockEvent6);
        eventAccumulator.close();
    }

    @Test
    public void testDoneUnblockWaitingThreads() throws ExecutionException, InterruptedException, TimeoutException {
        EventAccumulator eventAccumulator = new EventAccumulator();
        MockEvent mockEvent = new MockEvent(1, 0);
        MockEvent mockEvent2 = new MockEvent(1, 1);
        MockEvent mockEvent3 = new MockEvent(1, 2);
        eventAccumulator.getClass();
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(eventAccumulator::poll);
        eventAccumulator.getClass();
        CompletableFuture supplyAsync2 = CompletableFuture.supplyAsync(eventAccumulator::poll);
        eventAccumulator.getClass();
        CompletableFuture supplyAsync3 = CompletableFuture.supplyAsync(eventAccumulator::poll);
        List asList = Arrays.asList(supplyAsync, supplyAsync2, supplyAsync3);
        Assertions.assertFalse(supplyAsync.isDone());
        Assertions.assertFalse(supplyAsync2.isDone());
        Assertions.assertFalse(supplyAsync3.isDone());
        eventAccumulator.add(mockEvent);
        eventAccumulator.add(mockEvent2);
        eventAccumulator.add(mockEvent3);
        Assertions.assertEquals(mockEvent, CompletableFuture.anyOf((CompletableFuture[]) asList.toArray(new CompletableFuture[0])).get(5L, TimeUnit.SECONDS));
        List list = (List) asList.stream().filter(completableFuture -> {
            return !completableFuture.isDone();
        }).collect(Collectors.toList());
        Assertions.assertEquals(2, list.size());
        eventAccumulator.done(mockEvent);
        Assertions.assertEquals(mockEvent2, CompletableFuture.anyOf((CompletableFuture[]) list.toArray(new CompletableFuture[0])).get(5L, TimeUnit.SECONDS));
        List list2 = (List) list.stream().filter(completableFuture2 -> {
            return !completableFuture2.isDone();
        }).collect(Collectors.toList());
        Assertions.assertEquals(1, list2.size());
        eventAccumulator.done(mockEvent2);
        Assertions.assertEquals(mockEvent3, CompletableFuture.anyOf((CompletableFuture[]) list2.toArray(new CompletableFuture[0])).get(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(0, ((List) list2.stream().filter(completableFuture3 -> {
            return !completableFuture3.isDone();
        }).collect(Collectors.toList())).size());
        eventAccumulator.done(mockEvent3);
        Assertions.assertEquals(0, eventAccumulator.size());
        eventAccumulator.close();
    }

    @Test
    public void testCloseUnblockWaitingThreads() throws ExecutionException, InterruptedException, TimeoutException {
        EventAccumulator eventAccumulator = new EventAccumulator();
        eventAccumulator.getClass();
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(eventAccumulator::poll);
        eventAccumulator.getClass();
        CompletableFuture supplyAsync2 = CompletableFuture.supplyAsync(eventAccumulator::poll);
        eventAccumulator.getClass();
        CompletableFuture supplyAsync3 = CompletableFuture.supplyAsync(eventAccumulator::poll);
        Assertions.assertFalse(supplyAsync.isDone());
        Assertions.assertFalse(supplyAsync2.isDone());
        Assertions.assertFalse(supplyAsync3.isDone());
        eventAccumulator.close();
        Assertions.assertNull(supplyAsync.get(5L, TimeUnit.SECONDS));
        Assertions.assertNull(supplyAsync2.get(5L, TimeUnit.SECONDS));
        Assertions.assertNull(supplyAsync3.get(5L, TimeUnit.SECONDS));
    }
}
