package org.apache.hugegraph.computer.core.sender;

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.testutil.Whitebox;
import org.junit.Test;

/* loaded from: input_file:org/apache/hugegraph/computer/core/sender/MessageQueueTest.class */
public class MessageQueueTest {
    @Test
    public void testPutAndTake() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Objects.requireNonNull(atomicInteger);
        MessageQueue messageQueue = new MessageQueue(atomicInteger::incrementAndGet);
        QueuedMessage queuedMessage = new QueuedMessage(1, MessageType.VERTEX, ByteBuffer.allocate(4));
        messageQueue.put(queuedMessage);
        Assert.assertEquals(1L, atomicInteger.get());
        messageQueue.put(new QueuedMessage(2, MessageType.EDGE, ByteBuffer.allocate(4)));
        Assert.assertEquals(2L, atomicInteger.get());
        BlockingQueue blockingQueue = (BlockingQueue) Whitebox.getInternalState(messageQueue, "queue");
        Assert.assertEquals(2L, blockingQueue.size());
        Assert.assertEquals(queuedMessage.partitionId(), messageQueue.peek().partitionId());
        Assert.assertEquals(2L, blockingQueue.size());
        Assert.assertEquals(queuedMessage.partitionId(), messageQueue.take().partitionId());
        Assert.assertEquals(1L, blockingQueue.size());
        Assert.assertEquals(r0.partitionId(), messageQueue.take().partitionId());
        Assert.assertEquals(0L, blockingQueue.size());
        messageQueue.put(queuedMessage);
        Assert.assertEquals(3L, atomicInteger.get());
    }
}
