package org.apache.hugegraph.computer.core.receiver.vertex;

import java.io.File;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.computer.core.combiner.MergeNewPropertiesCombiner;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.id.BytesId;
import org.apache.hugegraph.computer.core.graph.properties.Properties;
import org.apache.hugegraph.computer.core.graph.value.LongValue;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.io.BytesOutput;
import org.apache.hugegraph.computer.core.io.IOFactory;
import org.apache.hugegraph.computer.core.io.RandomAccessInput;
import org.apache.hugegraph.computer.core.io.StreamGraphInput;
import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer;
import org.apache.hugegraph.computer.core.receiver.ReceiverUtil;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.FileManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;
import org.apache.hugegraph.computer.core.store.entry.EntryOutputImpl;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;
import org.apache.hugegraph.computer.core.store.entry.Pointer;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.testutil.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hugegraph/computer/core/receiver/vertex/VertexMessageRecvPartitionTest.class */
public class VertexMessageRecvPartitionTest extends UnitTestBase {
    private Config config;
    private VertexMessageRecvPartition partition;
    private FileManager fileManager;
    private SortManager sortManager;

    @Before
    public void setup() {
        this.config = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_001", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.JOB_PARTITIONS_COUNT, "1", ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "20", ComputerOptions.HGKV_MERGE_FILES_NUM, "5", ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false");
        FileUtils.deleteQuietly(new File("data_dir1"));
        FileUtils.deleteQuietly(new File("data_dir2"));
        this.fileManager = new FileManager();
        this.fileManager.init(this.config);
        this.sortManager = new RecvSortManager(context());
        this.sortManager.init(this.config);
        this.partition = new VertexMessageRecvPartition(context(), new SuperstepFileGenerator(this.fileManager, -1), this.sortManager);
    }

    @After
    public void teardown() {
        this.fileManager.close(this.config);
        this.sortManager.close(this.config);
    }

    @Test
    public void testVertexMessageRecvPartition() throws IOException {
        Assert.assertEquals("vertex", this.partition.type());
        Assert.assertEquals(0L, this.partition.totalBytes());
        VertexMessageRecvPartition vertexMessageRecvPartition = this.partition;
        Objects.requireNonNull(vertexMessageRecvPartition);
        addTenVertexBuffer(vertexMessageRecvPartition::addBuffer);
        PeekableIterator it = this.partition.iterator();
        checkPartitionIterator(it);
        Assert.assertFalse(it.hasNext());
    }

    @Test
    public void testOverwriteCombiner() throws IOException {
        this.config = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_001", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.JOB_PARTITIONS_COUNT, "1", ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "1000", ComputerOptions.HGKV_MERGE_FILES_NUM, "5", ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false");
        FileUtils.deleteQuietly(new File("data_dir1"));
        FileUtils.deleteQuietly(new File("data_dir2"));
        this.fileManager = new FileManager();
        this.fileManager.init(this.config);
        this.partition = new VertexMessageRecvPartition(context(), new SuperstepFileGenerator(this.fileManager, -1), this.sortManager);
        VertexMessageRecvPartition vertexMessageRecvPartition = this.partition;
        Objects.requireNonNull(vertexMessageRecvPartition);
        addTenVertexBuffer(vertexMessageRecvPartition::addBuffer);
        VertexMessageRecvPartition vertexMessageRecvPartition2 = this.partition;
        Objects.requireNonNull(vertexMessageRecvPartition2);
        addTenVertexBuffer(vertexMessageRecvPartition2::addBuffer);
        checkPartitionIterator(this.partition.iterator());
        this.fileManager.close(this.config);
    }

    @Test
    public void testMergePropertiesCombiner() throws IOException {
        this.config = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_001", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.JOB_PARTITIONS_COUNT, "1", ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000", ComputerOptions.HGKV_MERGE_FILES_NUM, "5", ComputerOptions.WORKER_VERTEX_PROPERTIES_COMBINER_CLASS, MergeNewPropertiesCombiner.class.getName(), ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false");
        FileUtils.deleteQuietly(new File("data_dir1"));
        FileUtils.deleteQuietly(new File("data_dir2"));
        this.fileManager = new FileManager();
        this.fileManager.init(this.config);
        this.partition = new VertexMessageRecvPartition(context(), new SuperstepFileGenerator(this.fileManager, -1), this.sortManager);
        VertexMessageRecvPartition vertexMessageRecvPartition = this.partition;
        Objects.requireNonNull(vertexMessageRecvPartition);
        addTwentyDuplicateVertexBuffer(vertexMessageRecvPartition::addBuffer);
        checkTenVertexWithMergedProperties(this.partition.iterator());
        this.fileManager.close(this.config);
    }

    @Test
    public void testMergeBuffersFailed() {
        VertexMessageRecvPartition vertexMessageRecvPartition = this.partition;
        Objects.requireNonNull(vertexMessageRecvPartition);
        addTwoEmptyBuffer(vertexMessageRecvPartition::addBuffer);
        Assert.assertThrows(ComputerException.class, () -> {
            this.partition.iterator();
        }, th -> {
            Assert.assertContains("Failed to merge 2 buffers to file", th.getMessage());
        });
    }

    @Test
    public void testEmptyIterator() {
        Assert.assertFalse(this.partition.iterator().hasNext());
    }

    public static void addTenVertexBuffer(Consumer<NetworkBuffer> consumer) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                return;
            }
            Vertex createVertex = graphFactory().createVertex();
            createVertex.id(BytesId.of(j2));
            createVertex.properties(graphFactory().createProperties());
            ReceiverUtil.consumeBuffer(writeVertex(createVertex), consumer);
            j = j2 + 1;
        }
    }

    private static void addTwentyDuplicateVertexBuffer(Consumer<NetworkBuffer> consumer) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                break;
            }
            Vertex createVertex = graphFactory().createVertex();
            createVertex.id(BytesId.of(j2));
            Properties createProperties = graphFactory().createProperties();
            createProperties.put("p1", new LongValue(j2));
            createVertex.properties(createProperties);
            ReceiverUtil.consumeBuffer(writeVertex(createVertex), consumer);
            j = j2 + 1;
        }
        long j3 = 0;
        while (true) {
            long j4 = j3;
            if (j4 >= 10) {
                return;
            }
            Vertex createVertex2 = graphFactory().createVertex();
            createVertex2.id(BytesId.of(j4));
            Properties createProperties2 = graphFactory().createProperties();
            createProperties2.put("p2", new LongValue(2 * j4));
            createVertex2.properties(createProperties2);
            ReceiverUtil.consumeBuffer(writeVertex(createVertex2), consumer);
            j3 = j4 + 1;
        }
    }

    private static void addTwoEmptyBuffer(Consumer<NetworkBuffer> consumer) {
        for (int i = 0; i < 2; i++) {
            ReceiverUtil.consumeBuffer(new byte[2], consumer);
        }
    }

    private static byte[] writeVertex(Vertex vertex) throws IOException {
        BytesOutput createBytesOutput = IOFactory.createBytesOutput(32);
        new EntryOutputImpl(createBytesOutput).writeEntry(randomAccessOutput -> {
            vertex.id().write(randomAccessOutput);
        }, randomAccessOutput2 -> {
            randomAccessOutput2.writeUTF(vertex.label());
            vertex.properties().write(randomAccessOutput2);
        });
        return createBytesOutput.toByteArray();
    }

    private static void checkTenVertexWithMergedProperties(PeekableIterator<KvEntry> peekableIterator) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                return;
            }
            Assert.assertTrue(peekableIterator.hasNext());
            KvEntry kvEntry = (KvEntry) peekableIterator.next();
            Assert.assertEquals(BytesId.of(j2), ReceiverUtil.readId(kvEntry.key()));
            Pointer value = kvEntry.value();
            RandomAccessInput input = value.input();
            long position = input.position();
            input.seek(value.offset());
            Assert.assertEquals("", StreamGraphInput.readLabel(input));
            Properties createProperties = graphFactory().createProperties();
            createProperties.read(input);
            input.seek(position);
            Assert.assertEquals(2L, createProperties.size());
            Assert.assertEquals(new LongValue(j2), createProperties.get("p1"));
            Assert.assertEquals(new LongValue(2 * j2), createProperties.get("p2"));
            j = j2 + 1;
        }
    }

    public static void checkPartitionIterator(PeekableIterator<KvEntry> peekableIterator) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                return;
            }
            Assert.assertTrue(peekableIterator.hasNext());
            Assert.assertEquals(BytesId.of(j2), ReceiverUtil.readId(((KvEntry) peekableIterator.next()).key()));
            j = j2 + 1;
        }
    }
}
