package org.apache.hugegraph.computer.core.sender;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.graph.GraphFactory;
import org.apache.hugegraph.computer.core.graph.id.BytesId;
import org.apache.hugegraph.computer.core.graph.properties.Properties;
import org.apache.hugegraph.computer.core.graph.value.DoubleValue;
import org.apache.hugegraph.computer.core.graph.value.IntValue;
import org.apache.hugegraph.computer.core.graph.value.ListValue;
import org.apache.hugegraph.computer.core.graph.value.Value;
import org.apache.hugegraph.computer.core.graph.value.ValueType;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.io.RandomAccessInput;
import org.apache.hugegraph.computer.core.io.StreamGraphInput;
import org.apache.hugegraph.computer.core.store.entry.EntryInputImpl;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.testutil.Whitebox;
import org.junit.Test;

/* loaded from: input_file:org/apache/hugegraph/computer/core/sender/WriteBuffersTest.class */
public class WriteBuffersTest extends UnitTestBase {
    @Test
    public void testConstructor() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            new WriteBuffers(context(), 0, 20);
        }, th -> {
            Assert.assertTrue(th.getMessage().contains("The threshold of buffer must be > 0"));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            new WriteBuffers(context(), 10, -1);
        }, th2 -> {
            Assert.assertTrue(th2.getMessage().contains("The capacity of buffer must be > 0"));
        });
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            new WriteBuffers(context(), 20, 10);
        }, th3 -> {
            Assert.assertTrue(th3.getMessage().contains("The threshold must be <= capacity"));
        });
        new WriteBuffers(context(), 10, 20);
    }

    @Test
    public void testReachThreshold() throws IOException {
        WriteBuffers writeBuffers = new WriteBuffers(context(), 20, 50);
        Assert.assertFalse(writeBuffers.reachThreshold());
        Vertex createVertex = context().graphFactory().createVertex(BytesId.of(1L), new DoubleValue(0.5d));
        writeBuffers.writeVertex(createVertex);
        Assert.assertFalse(writeBuffers.reachThreshold());
        writeBuffers.writeVertex(createVertex);
        Assert.assertTrue(writeBuffers.reachThreshold());
        writeBuffers.writeVertex(createVertex);
        Assert.assertTrue(writeBuffers.reachThreshold());
    }

    @Test
    public void testIsEmpty() throws IOException {
        WriteBuffers writeBuffers = new WriteBuffers(context(), 10, 20);
        Assert.assertTrue(writeBuffers.isEmpty());
        writeBuffers.writeVertex(context().graphFactory().createVertex(BytesId.of(1L), new DoubleValue(0.5d)));
        Assert.assertFalse(writeBuffers.isEmpty());
    }

    @Test
    public void testWriteVertex() throws IOException {
        GraphFactory graphFactory = context().graphFactory();
        WriteBuffers writeBuffers = new WriteBuffers(context(), 100, 110);
        writeBuffers.writeVertex(graphFactory.createVertex(BytesId.of(1L), new DoubleValue(0.5d)));
        long position = ((WriteBuffer) Whitebox.getInternalState(writeBuffers, "writingBuffer")).output().position();
        Assert.assertGt(0L, Long.valueOf(position));
        Vertex createVertex = graphFactory.createVertex(BytesId.of(1L), new DoubleValue(0.5d));
        Properties createProperties = graphFactory.createProperties();
        createProperties.put("name", BytesId.of("marko"));
        createProperties.put("age", new IntValue(18));
        createProperties.put("city", new ListValue(ValueType.ID, ImmutableList.of(BytesId.of("wuhan"), BytesId.of("xian"))));
        createVertex.properties(createProperties);
        writeBuffers.writeVertex(createVertex);
        long position2 = ((WriteBuffer) Whitebox.getInternalState(writeBuffers, "writingBuffer")).output().position();
        Assert.assertGt(Long.valueOf(position), Long.valueOf(position2));
        Vertex createVertex2 = graphFactory.createVertex(BytesId.of(1L), new DoubleValue(0.5d));
        createVertex2.addEdge(graphFactory.createEdge(BytesId.of(2L)));
        createVertex2.addEdge(graphFactory.createEdge("knows", BytesId.of(3L)));
        createVertex2.addEdge(graphFactory.createEdge("watch", "1111", BytesId.of(4L)));
        writeBuffers.writeEdges(createVertex2);
        Assert.assertGt(Long.valueOf(position2), Long.valueOf(((WriteBuffer) Whitebox.getInternalState(writeBuffers, "writingBuffer")).output().position()));
    }

    @Test
    public void testWriteMessage() throws IOException {
        WriteBuffers writeBuffers = new WriteBuffers(context(), 50, 100);
        WriteBuffer writeBuffer = (WriteBuffer) Whitebox.getInternalState(writeBuffers, "writingBuffer");
        writeBuffers.writeMessage(BytesId.of(1L), new DoubleValue(0.85d));
        long position = writeBuffer.output().position();
        Assert.assertGt(0L, Long.valueOf(position));
        writeBuffers.writeMessage(BytesId.of(2L), new DoubleValue(0.15d));
        Assert.assertGt(Long.valueOf(position), Long.valueOf(writeBuffer.output().position()));
    }

    @Test
    public void testPrepareSorting() throws IOException, InterruptedException {
        GraphFactory graphFactory = context().graphFactory();
        WriteBuffers writeBuffers = new WriteBuffers(context(), 50, 100);
        Vertex createVertex = graphFactory.createVertex(BytesId.of(1L), new DoubleValue(0.5d));
        createVertex.addEdge(graphFactory.createEdge(BytesId.of(2L)));
        createVertex.addEdge(graphFactory.createEdge("knows", BytesId.of(3L)));
        createVertex.addEdge(graphFactory.createEdge("watch", "1111", BytesId.of(4L)));
        writeBuffers.writeEdges(createVertex);
        Assert.assertTrue(writeBuffers.reachThreshold());
        Assert.assertFalse(writeBuffers.isEmpty());
        writeBuffers.prepareSorting();
        Assert.assertFalse(writeBuffers.reachThreshold());
        Assert.assertTrue(writeBuffers.isEmpty());
        Thread thread = new Thread(() -> {
            Assert.assertThrows(ComputerException.class, () -> {
                writeBuffers.prepareSorting();
            }, th -> {
                Assert.assertTrue(th.getMessage().contains("Interrupted"));
            });
        });
        thread.start();
        Thread.sleep(100L);
        thread.interrupt();
    }

    @Test
    public void testSwitchAndFinishSorting() throws IOException, InterruptedException {
        GraphFactory graphFactory = context().graphFactory();
        WriteBuffers writeBuffers = new WriteBuffers(context(), 50, 100);
        Vertex createVertex = graphFactory.createVertex(BytesId.of(1L), new DoubleValue(0.5d));
        createVertex.addEdge(graphFactory.createEdge(BytesId.of(2L)));
        createVertex.addEdge(graphFactory.createEdge("knows", BytesId.of(3L)));
        createVertex.addEdge(graphFactory.createEdge("watch", "1111", BytesId.of(4L)));
        writeBuffers.writeEdges(createVertex);
        Assert.assertTrue(writeBuffers.reachThreshold());
        writeBuffers.switchForSorting();
        Assert.assertFalse(writeBuffers.reachThreshold());
        Assert.assertTrue(writeBuffers.isEmpty());
        writeBuffers.switchForSorting();
        Assert.assertFalse(writeBuffers.reachThreshold());
        Assert.assertTrue(writeBuffers.isEmpty());
        writeBuffers.writeEdges(createVertex);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Thread thread = new Thread(() -> {
            writeBuffers.switchForSorting();
            Assert.assertEquals(2L, atomicInteger.get());
        });
        Thread thread2 = new Thread(() -> {
            while (atomicInteger.get() < 2) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    Assert.fail(e.getMessage());
                }
                atomicInteger.incrementAndGet();
            }
            writeBuffers.finishSorting();
        });
        thread.start();
        thread2.start();
        thread.join();
        thread2.join();
    }

    @Test
    public void wrapForRead() throws IOException {
        GraphFactory graphFactory = context().graphFactory();
        WriteBuffers writeBuffers = new WriteBuffers(context(), 10, 20);
        Vertex createVertex = graphFactory.createVertex(BytesId.of(1L), new DoubleValue(0.5d));
        writeBuffers.writeVertex(createVertex);
        writeBuffers.prepareSorting();
        RandomAccessInput wrapForRead = writeBuffers.wrapForRead();
        try {
            StreamGraphInput streamGraphInput = new StreamGraphInput(context(), new EntryInputImpl(wrapForRead));
            createVertex.value((Value) null);
            Assert.assertEquals(createVertex, streamGraphInput.readVertex());
            if (wrapForRead != null) {
                wrapForRead.close();
            }
        } catch (Throwable th) {
            if (wrapForRead != null) {
                try {
                    wrapForRead.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
