package org.apache.hugegraph.computer.core.receiver;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.hugegraph.computer.core.combiner.DoubleValueSumCombiner;
import org.apache.hugegraph.computer.core.common.ContainerInfo;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.value.DoubleValue;
import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitionTest;
import org.apache.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitionTest;
import org.apache.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitionTest;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.snapshot.SnapshotManager;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.FileManager;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.testutil.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.class */
public class MessageRecvManagerTest extends UnitTestBase {
    private Config config;
    private FileManager fileManager;
    private SortManager sortManager;
    private MessageRecvManager receiveManager;
    private SnapshotManager snapshotManager;
    private ConnectionId connectionId;

    @Before
    public void setup() {
        this.config = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_001", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.BSP_MAX_SUPER_STEP, "1", ComputerOptions.WORKER_COMBINER_CLASS, DoubleValueSumCombiner.class.getName(), ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "100", ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT, "100", ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName(), ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false");
        this.fileManager = new FileManager();
        this.fileManager.init(this.config);
        this.sortManager = new RecvSortManager(context());
        this.sortManager.init(this.config);
        this.receiveManager = new MessageRecvManager(context(), this.fileManager, this.sortManager);
        this.snapshotManager = new SnapshotManager(context(), (MessageSendManager) null, this.receiveManager, (ContainerInfo) null);
        this.receiveManager.init(this.config);
        this.connectionId = new ConnectionId(new InetSocketAddress("localhost", 8081), 0);
    }

    @After
    public void teardown() {
        this.receiveManager.close(this.config);
        this.fileManager.close(this.config);
        this.sortManager.close(this.config);
    }

    @Test
    public void testVertexAndEdgeMessage() throws IOException {
        this.receiveManager.onStarted(this.connectionId);
        this.receiveManager.onFinished(this.connectionId);
        VertexMessageRecvPartitionTest.addTenVertexBuffer(networkBuffer -> {
            this.receiveManager.handle(MessageType.VERTEX, 0, networkBuffer);
        });
        EdgeMessageRecvPartitionTest.addTenEdgeBuffer(networkBuffer2 -> {
            this.receiveManager.handle(MessageType.EDGE, 0, networkBuffer2);
        });
        this.receiveManager.onStarted(this.connectionId);
        this.receiveManager.onFinished(this.connectionId);
        this.receiveManager.waitReceivedAllMessages();
        Map vertexPartitions = this.receiveManager.vertexPartitions();
        Map edgePartitions = this.receiveManager.edgePartitions();
        Assert.assertEquals(1L, vertexPartitions.size());
        Assert.assertEquals(1L, edgePartitions.size());
        VertexMessageRecvPartitionTest.checkPartitionIterator((PeekableIterator) vertexPartitions.get(0));
        EdgeMessageRecvPartitionTest.checkTenEdges((PeekableIterator) edgePartitions.get(0));
    }

    @Test
    public void testComputeMessage() throws IOException {
        this.receiveManager.beforeSuperstep(this.config, 0);
        ComputeMessageRecvPartitionTest.addTwentyCombineMessageBuffer(networkBuffer -> {
            this.receiveManager.handle(MessageType.MSG, 0, networkBuffer);
        });
        this.receiveManager.onFinished(this.connectionId);
        this.receiveManager.waitReceivedAllMessages();
        this.receiveManager.afterSuperstep(this.config, 0);
        Map messagePartitions = this.receiveManager.messagePartitions();
        Assert.assertEquals(1L, messagePartitions.size());
        ComputeMessageRecvPartitionTest.checkTenCombineMessages((PeekableIterator) messagePartitions.get(0));
    }

    @Test
    public void testOtherMessageType() {
        Assert.assertThrows(ComputerException.class, () -> {
            ReceiverUtil.consumeBuffer(new byte[100], networkBuffer -> {
                this.receiveManager.handle(MessageType.ACK, 0, networkBuffer);
            });
        }, th -> {
            Assert.assertEquals("Unable handle NetworkBuffer with type 'ACK'", th.getMessage());
        });
    }

    @Test
    public void testNotEnoughFinishMessages() {
        this.receiveManager.beforeSuperstep(this.config, 0);
        Assert.assertThrows(ComputerException.class, () -> {
            this.receiveManager.waitReceivedAllMessages();
        }, th -> {
            Assert.assertContains("finish-messages", th.getMessage());
        });
    }
}
