package org.apache.giraph.comm;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
import org.apache.giraph.comm.requests.SendVertexRequest;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.factories.TestMessageValueFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.IntNoOpComputation;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/giraph/comm/RequestTest.class */
public class RequestTest {
    private ImmutableClassesGiraphConfiguration conf;
    private ServerData<IntWritable, IntWritable, IntWritable> serverData;
    private NettyServer server;
    private NettyClient client;
    private WorkerInfo workerInfo;

    @Before
    public void setUp() {
        GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
        GiraphConstants.COMPUTATION_CLASS.set(giraphConfiguration, IntNoOpComputation.class);
        this.conf = new ImmutableClassesGiraphConfiguration(giraphConfiguration);
        Mapper.Context context = (Mapper.Context) Mockito.mock(Mapper.Context.class);
        Mockito.when(context.getConfiguration()).thenReturn(this.conf);
        this.serverData = MockUtils.createNewServerData(this.conf, context);
        this.serverData.prepareSuperstep();
        this.workerInfo = new WorkerInfo();
        this.server = new NettyServer(this.conf, new WorkerRequestServerHandler.Factory(this.serverData), this.workerInfo, context, new MockExceptionHandler());
        this.server.start();
        this.workerInfo.setInetSocketAddress(this.server.getMyAddress(), this.server.getLocalHostOrIp());
        this.client = new NettyClient(context, this.conf, new WorkerInfo(), new MockExceptionHandler());
        this.server.setFlowControl(this.client.getFlowControl());
        this.client.connectAllAddresses(Lists.newArrayList(this.workerInfo));
    }

    @Test
    public void sendVertexPartition() {
        Partition createPartition = this.conf.createPartition(13, null);
        for (int i = 0; i < 10; i++) {
            Vertex createVertex = this.conf.createVertex();
            createVertex.initialize(new IntWritable(i), new IntWritable(i));
            createPartition.putVertex(createVertex);
        }
        this.client.sendWritableRequest(this.workerInfo.getTaskId(), new SendVertexRequest(createPartition));
        this.client.waitAllRequests();
        this.client.stop();
        this.server.stop();
        PartitionStore<IntWritable, IntWritable, IntWritable> partitionStore = this.serverData.getPartitionStore();
        Assert.assertTrue(partitionStore.hasPartition(13));
        int i2 = 0;
        Partition<IntWritable, IntWritable, IntWritable> removePartition = partitionStore.removePartition(13);
        Iterator<Vertex<I, V, E>> it2 = removePartition.iterator();
        while (it2.hasNext()) {
            i2 += ((Vertex) it2.next()).getId().get();
        }
        partitionStore.putPartition(removePartition);
        Assert.assertEquals(i2, 45L);
        partitionStore.shutdown();
    }

    @Test
    public void sendWorkerMessagesRequest() {
        PairList pairList = new PairList();
        pairList.initialize();
        ByteArrayVertexIdMessages byteArrayVertexIdMessages = new ByteArrayVertexIdMessages(new TestMessageValueFactory(IntWritable.class));
        byteArrayVertexIdMessages.setConf(this.conf);
        byteArrayVertexIdMessages.initialize();
        pairList.add(0, byteArrayVertexIdMessages);
        for (int i = 1; i < 7; i++) {
            IntWritable intWritable = new IntWritable(i);
            for (int i2 = 0; i2 < i; i2++) {
                byteArrayVertexIdMessages.add((ByteArrayVertexIdMessages) intWritable, new IntWritable(i2));
            }
        }
        SendWorkerMessagesRequest sendWorkerMessagesRequest = new SendWorkerMessagesRequest(pairList);
        sendWorkerMessagesRequest.setConf(this.conf);
        this.client.sendWritableRequest(this.workerInfo.getTaskId(), sendWorkerMessagesRequest);
        this.client.waitAllRequests();
        this.client.stop();
        this.server.stop();
        int i3 = 0;
        int i4 = 0;
        for (IntWritable intWritable2 : this.serverData.getIncomingMessageStore().getPartitionDestinationVertices(0)) {
            i3 += intWritable2.get();
            Iterable vertexMessages = this.serverData.getIncomingMessageStore().getVertexMessages(intWritable2);
            synchronized (vertexMessages) {
                Iterator it2 = vertexMessages.iterator();
                while (it2.hasNext()) {
                    i4 += ((IntWritable) it2.next()).get();
                }
            }
        }
        Assert.assertEquals(21L, i3);
        Assert.assertEquals(35L, i4);
    }

    @Test
    public void sendWorkerIndividualMessagesRequest() throws IOException {
        ByteArrayOneMessageToManyIds byteArrayOneMessageToManyIds = new ByteArrayOneMessageToManyIds(new TestMessageValueFactory(IntWritable.class));
        byteArrayOneMessageToManyIds.setConf(this.conf);
        byteArrayOneMessageToManyIds.initialize();
        ExtendedDataOutput createExtendedDataOutput = this.conf.createExtendedDataOutput();
        for (int i = 1; i <= 7; i++) {
            new IntWritable(i).write(createExtendedDataOutput);
        }
        byteArrayOneMessageToManyIds.add(createExtendedDataOutput.getByteArray(), createExtendedDataOutput.getPos(), 7, new IntWritable(1));
        this.client.sendWritableRequest(this.workerInfo.getTaskId(), new SendWorkerOneMessageToManyRequest(byteArrayOneMessageToManyIds, this.conf));
        this.client.waitAllRequests();
        this.client.stop();
        this.server.stop();
        int i2 = 0;
        int i3 = 0;
        for (IntWritable intWritable : this.serverData.getIncomingMessageStore().getPartitionDestinationVertices(0)) {
            i2 += intWritable.get();
            Iterable vertexMessages = this.serverData.getIncomingMessageStore().getVertexMessages(intWritable);
            synchronized (vertexMessages) {
                Iterator it2 = vertexMessages.iterator();
                while (it2.hasNext()) {
                    i3 += ((IntWritable) it2.next()).get();
                }
            }
        }
        Assert.assertEquals(28L, i2);
        Assert.assertEquals(7L, i3);
    }

    @Test
    public void sendPartitionMutationsRequest() {
        HashMap newHashMap = Maps.newHashMap();
        for (int i = 0; i < 11; i++) {
            VertexMutations vertexMutations = new VertexMutations();
            for (int i2 = 0; i2 < 3; i2++) {
                Vertex createVertex = this.conf.createVertex();
                createVertex.initialize(new IntWritable(i), new IntWritable(i2));
                vertexMutations.addVertex(createVertex);
            }
            for (int i3 = 0; i3 < 2; i3++) {
                vertexMutations.removeVertex();
            }
            for (int i4 = 0; i4 < 5; i4++) {
                vertexMutations.addEdge(EdgeFactory.create(new IntWritable(i), new IntWritable(2 * i4)));
            }
            for (int i5 = 0; i5 < 7; i5++) {
                vertexMutations.removeEdge(new IntWritable(i5));
            }
            newHashMap.put(new IntWritable(i), vertexMutations);
        }
        SendPartitionMutationsRequest sendPartitionMutationsRequest = new SendPartitionMutationsRequest(19, newHashMap);
        GiraphMetrics.init(this.conf);
        this.client.sendWritableRequest(this.workerInfo.getTaskId(), sendPartitionMutationsRequest);
        this.client.waitAllRequests();
        this.client.stop();
        this.server.stop();
        int i6 = 0;
        for (Map.Entry<IntWritable, VertexMutations<IntWritable, IntWritable, IntWritable>> entry : this.serverData.getPartitionMutations().get(19).entrySet()) {
            synchronized (entry.getValue()) {
                i6 += entry.getKey().get();
                int i7 = 0;
                Iterator<Vertex<IntWritable, IntWritable, IntWritable>> it2 = entry.getValue().getAddedVertexList().iterator();
                while (it2.hasNext()) {
                    i7 += it2.next().getValue().get();
                }
                Assert.assertEquals(3L, i7);
                Assert.assertEquals(2L, entry.getValue().getRemovedVertexCount());
                int i8 = 0;
                Iterator<Edge<IntWritable, IntWritable>> it3 = entry.getValue().getAddedEdgeList().iterator();
                while (it3.hasNext()) {
                    i8 += it3.next().mo2226getValue().get();
                }
                Assert.assertEquals(20L, i8);
            }
        }
        Assert.assertEquals(55L, i6);
    }
}
