package org.apache.hadoop.hdds.server.events;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/hadoop/hdds/server/events/TestEventQueue.class */
public class TestEventQueue {
    private static final Event<Long> EVENT1 = new TypedEvent(Long.class, "SCM_EVENT1");
    private static final Event<Long> EVENT2 = new TypedEvent(Long.class, "SCM_EVENT2");
    private EventQueue queue;
    private AtomicLong eventTotal = new AtomicLong();

    /* loaded from: input_file:org/apache/hadoop/hdds/server/events/TestEventQueue$TestHandler.class */
    public class TestHandler implements EventHandler {
        public TestHandler() {
        }

        public void onMessage(Object obj, EventPublisher eventPublisher) {
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            TestEventQueue.this.eventTotal.getAndAdd(((Long) obj).longValue());
        }
    }

    @BeforeEach
    public void startEventQueue() {
        DefaultMetricsSystem.initialize(getClass().getSimpleName());
        this.queue = new EventQueue();
    }

    @AfterEach
    public void stopEventQueue() {
        DefaultMetricsSystem.shutdown();
        this.queue.close();
    }

    @Test
    public void simpleEvent() {
        long[] jArr = new long[2];
        this.queue.addHandler(EVENT1, (l, eventPublisher) -> {
            jArr[0] = l.longValue();
        });
        this.queue.fireEvent(EVENT1, 11L);
        this.queue.processAll(1000L);
        Assertions.assertEquals(11L, jArr[0]);
    }

    @Test
    public void simpleEventWithFixedThreadPoolExecutor() throws Exception {
        TestHandler testHandler = new TestHandler();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ArrayList arrayList = new ArrayList();
        arrayList.add(linkedBlockingQueue);
        this.queue.addHandler(EVENT1, new FixedThreadPoolWithAffinityExecutor(EventQueue.getExecutorName(EVENT1, testHandler), testHandler, arrayList, this.queue, Long.class, FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(arrayList), new ConcurrentHashMap()), testHandler);
        this.queue.fireEvent(EVENT1, 11L);
        this.queue.fireEvent(EVENT1, 11L);
        this.queue.fireEvent(EVENT1, 12L);
        this.queue.fireEvent(EVENT1, 13L);
        this.queue.fireEvent(EVENT1, 14L);
        this.queue.fireEvent(EVENT1, 15L);
        this.queue.fireEvent(EVENT1, 16L);
        this.queue.fireEvent(EVENT1, 17L);
        this.queue.fireEvent(EVENT1, 18L);
        this.queue.fireEvent(EVENT1, 19L);
        this.queue.fireEvent(EVENT1, 20L);
        EventExecutor eventExecutor = (EventExecutor) this.queue.getExecutorAndHandler(EVENT1).keySet().iterator().next();
        Assertions.assertEquals(11L, eventExecutor.queuedEvents());
        Thread.currentThread();
        Thread.sleep(500L);
        Assertions.assertTrue(eventExecutor.scheduledEvents() >= 1 && eventExecutor.scheduledEvents() <= 10);
        this.queue.processAll(60000L);
        Assertions.assertEquals(11L, eventExecutor.scheduledEvents());
        Assertions.assertEquals(166, this.eventTotal.intValue());
        Assertions.assertEquals(11L, eventExecutor.successfulEvents());
        this.eventTotal.set(0L);
        eventExecutor.close();
    }

    @Test
    public void multipleSubscriber() {
        long[] jArr = new long[2];
        this.queue.addHandler(EVENT2, (l, eventPublisher) -> {
            jArr[0] = l.longValue();
        });
        this.queue.addHandler(EVENT2, (l2, eventPublisher2) -> {
            jArr[1] = l2.longValue();
        });
        this.queue.fireEvent(EVENT2, 23L);
        this.queue.processAll(1000L);
        Assertions.assertEquals(23L, jArr[0]);
        Assertions.assertEquals(23L, jArr[1]);
    }
}
