package org.apache.hugegraph.computer.core.receiver;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.testutil.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/hugegraph/computer/core/receiver/MessageRecvBuffersTest.class */
public class MessageRecvBuffersTest {
    private static final long WAIT_TIMEOUT = 100;

    @Test
    public void testBufferToBuffers() {
        MessageRecvBuffers messageRecvBuffers = new MessageRecvBuffers(1024L, WAIT_TIMEOUT);
        messageRecvBuffers.waitSorted();
        for (int i = 0; i < 10; i++) {
            addMockBufferToBuffers(messageRecvBuffers, 100);
        }
        Assert.assertFalse(messageRecvBuffers.full());
        Assert.assertEquals(1000L, messageRecvBuffers.totalBytes());
        Assert.assertThrows(ComputerException.class, () -> {
            messageRecvBuffers.waitSorted();
        }, th -> {
            Assert.assertContains("Buffers have not been sorted in 100 ms", th.getMessage());
        });
        addMockBufferToBuffers(messageRecvBuffers, 100);
        Assert.assertTrue(messageRecvBuffers.full());
        Assert.assertEquals(11L, messageRecvBuffers.buffers().size());
        messageRecvBuffers.signalSorted();
        Assert.assertEquals(11L, messageRecvBuffers.buffers().size());
        Assert.assertEquals(1100L, messageRecvBuffers.totalBytes());
        messageRecvBuffers.waitSorted();
        messageRecvBuffers.waitSorted();
        messageRecvBuffers.prepareSort();
        Assert.assertEquals(0L, messageRecvBuffers.buffers().size());
        Assert.assertEquals(0L, messageRecvBuffers.totalBytes());
        messageRecvBuffers.waitSorted();
        for (int i2 = 0; i2 < 10; i2++) {
            addMockBufferToBuffers(messageRecvBuffers, 100);
        }
        Assert.assertEquals(1000L, messageRecvBuffers.totalBytes());
        Assert.assertFalse(messageRecvBuffers.full());
        Assert.assertThrows(ComputerException.class, () -> {
            messageRecvBuffers.waitSorted();
        }, th2 -> {
            Assert.assertContains("Buffers have not been sorted in 100 ms", th2.getMessage());
        });
        addMockBufferToBuffers(messageRecvBuffers, 100);
        Assert.assertTrue(messageRecvBuffers.full());
        Assert.assertEquals(11L, messageRecvBuffers.buffers().size());
    }

    @Test
    public void testSortBuffer() throws InterruptedException {
        MessageRecvBuffers messageRecvBuffers = new MessageRecvBuffers(1024L, WAIT_TIMEOUT);
        for (int i = 0; i < 10; i++) {
            addMockBufferToBuffers(messageRecvBuffers, 100);
        }
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        newFixedThreadPool.submit(() -> {
            messageRecvBuffers.waitSorted();
            countDownLatch.countDown();
        });
        newFixedThreadPool.submit(() -> {
            messageRecvBuffers.signalSorted();
            countDownLatch.countDown();
        });
        newFixedThreadPool.shutdown();
        countDownLatch.await();
    }

    @Test
    public void testWaitSortTimeout() {
        MessageRecvBuffers messageRecvBuffers = new MessageRecvBuffers(1024L, WAIT_TIMEOUT);
        for (int i = 0; i < 10; i++) {
            addMockBufferToBuffers(messageRecvBuffers, 100);
        }
        Assert.assertThrows(ComputerException.class, () -> {
            messageRecvBuffers.waitSorted();
        }, th -> {
            Assert.assertContains("Buffers have not been sorted in 100 ms", th.getMessage());
        });
    }

    @Test
    public void testSortInterrupt() throws InterruptedException {
        MessageRecvBuffers messageRecvBuffers = new MessageRecvBuffers(1024L, WAIT_TIMEOUT);
        for (int i = 0; i < 10; i++) {
            addMockBufferToBuffers(messageRecvBuffers, 100);
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            messageRecvBuffers.waitSorted();
        });
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            try {
                Assert.assertContains("Interrupted while waiting buffers to be sorted", th.getMessage());
                atomicBoolean.set(true);
                countDownLatch.countDown();
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        });
        thread.start();
        thread.interrupt();
        countDownLatch.await();
        Assert.assertTrue(atomicBoolean.get());
    }

    public static void addMockBufferToBuffers(MessageRecvBuffers messageRecvBuffers, int i) {
        ReceiverUtil.consumeBuffer(new byte[i], networkBuffer -> {
            messageRecvBuffers.addBuffer(networkBuffer);
        });
    }
}
