package org.apache.hugegraph.computer.core.receiver.edge;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.computer.core.combiner.MergeNewPropertiesCombiner;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.edge.Edge;
import org.apache.hugegraph.computer.core.graph.edge.Edges;
import org.apache.hugegraph.computer.core.graph.id.BytesId;
import org.apache.hugegraph.computer.core.graph.id.Id;
import org.apache.hugegraph.computer.core.graph.properties.Properties;
import org.apache.hugegraph.computer.core.graph.value.LongValue;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.io.BytesOutput;
import org.apache.hugegraph.computer.core.io.IOFactory;
import org.apache.hugegraph.computer.core.network.buffer.NetworkBuffer;
import org.apache.hugegraph.computer.core.receiver.ReceiverUtil;
import org.apache.hugegraph.computer.core.sort.flusher.PeekableIterator;
import org.apache.hugegraph.computer.core.sort.sorting.RecvSortManager;
import org.apache.hugegraph.computer.core.sort.sorting.SortManager;
import org.apache.hugegraph.computer.core.store.EntryIterator;
import org.apache.hugegraph.computer.core.store.FileManager;
import org.apache.hugegraph.computer.core.store.SuperstepFileGenerator;
import org.apache.hugegraph.computer.core.store.entry.EntriesUtil;
import org.apache.hugegraph.computer.core.store.entry.EntryOutputImpl;
import org.apache.hugegraph.computer.core.store.entry.KvEntry;
import org.apache.hugegraph.computer.core.store.entry.KvEntryWriter;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.testutil.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hugegraph/computer/core/receiver/edge/EdgeMessageRecvPartitionTest.class */
public class EdgeMessageRecvPartitionTest extends UnitTestBase {
    private Config config;
    private EdgeMessageRecvPartition partition;
    private FileManager fileManager;
    private SortManager sortManager;

    @Before
    public void setup() {
        this.config = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_001", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.JOB_PARTITIONS_COUNT, "1", ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "100", ComputerOptions.HGKV_MERGE_FILES_NUM, "2", ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false");
        FileUtils.deleteQuietly(new File("data_dir1"));
        FileUtils.deleteQuietly(new File("data_dir2"));
        this.fileManager = new FileManager();
        this.fileManager.init(this.config);
        this.sortManager = new RecvSortManager(context());
        this.sortManager.init(this.config);
        this.partition = new EdgeMessageRecvPartition(context(), new SuperstepFileGenerator(this.fileManager, -1), this.sortManager);
    }

    @After
    public void teardown() {
        this.fileManager.close(this.config);
        this.sortManager.close(this.config);
    }

    @Test
    public void testEdgeMessageRecvPartition() throws IOException {
        Assert.assertEquals("edge", this.partition.type());
        addTenEdgeBuffer(networkBuffer -> {
            this.partition.addBuffer(networkBuffer);
        });
        checkTenEdges(this.partition.iterator());
        this.fileManager.close(this.config);
    }

    @Test
    public void testOverwriteCombiner() throws IOException {
        Assert.assertEquals("edge", this.partition.type());
        EdgeMessageRecvPartition edgeMessageRecvPartition = this.partition;
        Objects.requireNonNull(edgeMessageRecvPartition);
        addTenEdgeBuffer(edgeMessageRecvPartition::addBuffer);
        EdgeMessageRecvPartition edgeMessageRecvPartition2 = this.partition;
        Objects.requireNonNull(edgeMessageRecvPartition2);
        addTenEdgeBuffer(edgeMessageRecvPartition2::addBuffer);
        checkTenEdges(this.partition.iterator());
        this.fileManager.close(this.config);
    }

    @Test
    public void testNotOverwritePropertiesCombiner() throws IOException {
        this.config = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_001", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.JOB_PARTITIONS_COUNT, "1", ComputerOptions.WORKER_DATA_DIRS, "[data_dir1, data_dir2]", ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT, "10000", ComputerOptions.HGKV_MERGE_FILES_NUM, "5", ComputerOptions.WORKER_EDGE_PROPERTIES_COMBINER_CLASS, MergeNewPropertiesCombiner.class.getName(), ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false");
        FileUtils.deleteQuietly(new File("data_dir1"));
        FileUtils.deleteQuietly(new File("data_dir2"));
        this.fileManager = new FileManager();
        this.fileManager.init(this.config);
        this.partition = new EdgeMessageRecvPartition(context(), new SuperstepFileGenerator(this.fileManager, -1), this.sortManager);
        Assert.assertEquals("edge", this.partition.type());
        EdgeMessageRecvPartition edgeMessageRecvPartition = this.partition;
        Objects.requireNonNull(edgeMessageRecvPartition);
        addTenDuplicateEdgeBuffer(edgeMessageRecvPartition::addBuffer);
        checkTenEdgesWithCombinedProperties(this.partition.iterator());
    }

    public static void addTenEdgeBuffer(Consumer<NetworkBuffer> consumer) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                return;
            }
            Vertex createVertex = graphFactory().createVertex();
            createVertex.id(BytesId.of(j2));
            Edges createEdges = graphFactory().createEdges(2);
            long j3 = j2;
            while (true) {
                long j4 = j3 + 1;
                if (j4 < j2 + 3) {
                    Edge createEdge = graphFactory().createEdge();
                    createEdge.targetId(BytesId.of(j4));
                    Properties createProperties = graphFactory().createProperties();
                    createProperties.put("p1", new LongValue(j2));
                    createEdge.properties(createProperties);
                    createEdges.add(createEdge);
                    j3 = j4;
                }
            }
            createVertex.edges(createEdges);
            ReceiverUtil.consumeBuffer(writeEdges(createVertex), consumer);
            j = j2 + 1;
        }
    }

    private static void addTenDuplicateEdgeBuffer(Consumer<NetworkBuffer> consumer) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                break;
            }
            Vertex createVertex = graphFactory().createVertex();
            createVertex.id(BytesId.of(j2));
            Edges createEdges = graphFactory().createEdges(2);
            long j3 = j2;
            while (true) {
                long j4 = j3 + 1;
                if (j4 < j2 + 3) {
                    Edge createEdge = graphFactory().createEdge();
                    createEdge.targetId(BytesId.of(j4));
                    Properties createProperties = graphFactory().createProperties();
                    createProperties.put("p1", new LongValue(j2));
                    createEdge.properties(createProperties);
                    createEdges.add(createEdge);
                    j3 = j4;
                }
            }
            createVertex.edges(createEdges);
            ReceiverUtil.consumeBuffer(writeEdges(createVertex), consumer);
            j = j2 + 1;
        }
        long j5 = 0;
        while (true) {
            long j6 = j5;
            if (j6 >= 10) {
                return;
            }
            Vertex createVertex2 = graphFactory().createVertex();
            createVertex2.id(BytesId.of(j6));
            Edges createEdges2 = graphFactory().createEdges(2);
            long j7 = j6;
            while (true) {
                long j8 = j7 + 1;
                if (j8 < j6 + 3) {
                    Edge createEdge2 = graphFactory().createEdge();
                    createEdge2.targetId(BytesId.of(j8));
                    Properties createProperties2 = graphFactory().createProperties();
                    createProperties2.put("p2", new LongValue(2 * j6));
                    createEdge2.properties(createProperties2);
                    createEdges2.add(createEdge2);
                    j7 = j8;
                }
            }
            createVertex2.edges(createEdges2);
            ReceiverUtil.consumeBuffer(writeEdges(createVertex2), consumer);
            j5 = j6 + 1;
        }
    }

    public static void checkTenEdges(PeekableIterator<KvEntry> peekableIterator) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                Assert.assertFalse(peekableIterator.hasNext());
                return;
            }
            Assert.assertTrue(peekableIterator.hasNext());
            KvEntry kvEntry = (KvEntry) peekableIterator.next();
            Assert.assertEquals(BytesId.of(j2), ReceiverUtil.readId(kvEntry.key()));
            EntryIterator subKvIterFromEntry = EntriesUtil.subKvIterFromEntry(kvEntry);
            long j3 = j2;
            while (true) {
                long j4 = j3 + 1;
                if (j4 < j2 + 3) {
                    Assert.assertTrue(subKvIterFromEntry.hasNext());
                    KvEntry kvEntry2 = (KvEntry) subKvIterFromEntry.next();
                    Assert.assertEquals(BytesId.of(j4), ReceiverUtil.readId(kvEntry2.key()));
                    Properties createProperties = graphFactory().createProperties();
                    ReceiverUtil.readValue(kvEntry2.value(), createProperties);
                    Assert.assertEquals(1L, createProperties.size());
                    Assert.assertEquals(new LongValue(j2), createProperties.get("p1"));
                    j3 = j4;
                }
            }
            j = j2 + 1;
        }
    }

    private static void checkTenEdgesWithCombinedProperties(Iterator<KvEntry> it) throws IOException {
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                Assert.assertFalse(it.hasNext());
                return;
            }
            Assert.assertTrue(it.hasNext());
            KvEntry next = it.next();
            Assert.assertEquals(BytesId.of(j2), ReceiverUtil.readId(next.key()));
            EntryIterator subKvIterFromEntry = EntriesUtil.subKvIterFromEntry(next);
            long j3 = j2;
            while (true) {
                long j4 = j3 + 1;
                if (j4 < j2 + 3) {
                    Assert.assertTrue(subKvIterFromEntry.hasNext());
                    KvEntry kvEntry = (KvEntry) subKvIterFromEntry.next();
                    Assert.assertEquals(BytesId.of(j4), ReceiverUtil.readId(kvEntry.key()));
                    Properties createProperties = graphFactory().createProperties();
                    ReceiverUtil.readValue(kvEntry.value(), createProperties);
                    Assert.assertEquals(2L, createProperties.size());
                    Assert.assertEquals(new LongValue(j2), createProperties.get("p1"));
                    Assert.assertEquals(new LongValue(2 * j2), createProperties.get("p2"));
                    j3 = j4;
                }
            }
            j = j2 + 1;
        }
    }

    private static byte[] writeEdges(Vertex vertex) throws IOException {
        BytesOutput createBytesOutput = IOFactory.createBytesOutput(32);
        EntryOutputImpl entryOutputImpl = new EntryOutputImpl(createBytesOutput);
        Id id = vertex.id();
        KvEntryWriter writeEntry = entryOutputImpl.writeEntry(randomAccessOutput -> {
            id.write(randomAccessOutput);
        });
        for (Edge edge : vertex.edges()) {
            Id targetId = edge.targetId();
            writeEntry.writeSubKv(randomAccessOutput2 -> {
                targetId.write(randomAccessOutput2);
            }, randomAccessOutput3 -> {
                edge.properties().write(randomAccessOutput3);
            });
        }
        writeEntry.writeFinish();
        return createBytesOutput.toByteArray();
    }
}
