package org.apache.giraph.comm;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Iterator;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.TestMessageValueFactory;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.IntNoOpComputation;
import org.apache.giraph.utils.MockUtils;
import org.apache.giraph.utils.PairList;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.codehaus.jackson.util.BufferRecycler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.python.apache.xerces.dom3.as.ASDataType;

/* loaded from: input_file:org/apache/giraph/comm/RequestFailureTest.class */
public class RequestFailureTest {
    private ImmutableClassesGiraphConfiguration conf;
    private ServerData<IntWritable, IntWritable, IntWritable> serverData;
    private NettyServer server;
    private NettyClient client;
    private Mapper.Context context;

    @Before
    public void setUp() throws IOException {
        GiraphConfiguration giraphConfiguration = new GiraphConfiguration();
        giraphConfiguration.setComputationClass(IntNoOpComputation.class);
        this.conf = new ImmutableClassesGiraphConfiguration(giraphConfiguration);
        this.context = (Mapper.Context) Mockito.mock(Mapper.Context.class);
        Mockito.when(this.context.getConfiguration()).thenReturn(this.conf);
    }

    private WritableRequest getRequest() {
        PairList pairList = new PairList();
        pairList.initialize();
        ByteArrayVertexIdMessages byteArrayVertexIdMessages = new ByteArrayVertexIdMessages(new TestMessageValueFactory(IntWritable.class));
        byteArrayVertexIdMessages.setConf(this.conf);
        byteArrayVertexIdMessages.initialize();
        pairList.add(0, byteArrayVertexIdMessages);
        for (int i = 1; i < 7; i++) {
            IntWritable intWritable = new IntWritable(i);
            for (int i2 = 0; i2 < i; i2++) {
                byteArrayVertexIdMessages.add((ByteArrayVertexIdMessages) intWritable, new IntWritable(i2));
            }
        }
        SendWorkerMessagesRequest sendWorkerMessagesRequest = new SendWorkerMessagesRequest(pairList);
        sendWorkerMessagesRequest.setConf(this.conf);
        return sendWorkerMessagesRequest;
    }

    private void checkResult(int i) {
        int i2 = 0;
        int i3 = 0;
        for (IntWritable intWritable : this.serverData.getIncomingMessageStore().getPartitionDestinationVertices(0)) {
            i2 += intWritable.get();
            Iterable vertexMessages = this.serverData.getIncomingMessageStore().getVertexMessages(intWritable);
            synchronized (vertexMessages) {
                Iterator it2 = vertexMessages.iterator();
                while (it2.hasNext()) {
                    i3 += ((IntWritable) it2.next()).get();
                }
            }
        }
        Assert.assertEquals(21L, i2);
        Assert.assertEquals(35 * i, i3);
    }

    @Test
    public void send2Requests() throws IOException {
        checkSendingTwoRequests();
    }

    @Test
    public void alreadyProcessedRequest() throws IOException {
        GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED.set(this.conf, true);
        GiraphConstants.MAX_REQUEST_MILLISECONDS.set(this.conf, ASDataType.OTHER_SIMPLE_DATATYPE);
        GiraphConstants.WAITING_REQUEST_MSECS.set(this.conf, BufferRecycler.DEFAULT_WRITE_CONCAT_BUFFER_LEN);
        checkSendingTwoRequests();
    }

    @Test
    public void resendRequest() throws IOException {
        GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(this.conf, false);
        GiraphConstants.MAX_REQUEST_MILLISECONDS.set(this.conf, ASDataType.OTHER_SIMPLE_DATATYPE);
        GiraphConstants.WAITING_REQUEST_MSECS.set(this.conf, BufferRecycler.DEFAULT_WRITE_CONCAT_BUFFER_LEN);
        checkSendingTwoRequests();
    }

    private void checkSendingTwoRequests() throws IOException {
        this.serverData = MockUtils.createNewServerData(this.conf, this.context);
        this.serverData.prepareSuperstep();
        WorkerInfo workerInfo = new WorkerInfo();
        this.server = new NettyServer(this.conf, new WorkerRequestServerHandler.Factory(this.serverData), workerInfo, this.context, new MockExceptionHandler());
        this.server.start();
        workerInfo.setInetSocketAddress(this.server.getMyAddress(), this.server.getLocalHostOrIp());
        this.client = new NettyClient(this.context, this.conf, new WorkerInfo(), new MockExceptionHandler());
        this.server.setFlowControl(this.client.getFlowControl());
        this.client.connectAllAddresses(Lists.newArrayList(workerInfo));
        WritableRequest request = getRequest();
        WritableRequest request2 = getRequest();
        this.client.sendWritableRequest(workerInfo.getTaskId(), request);
        this.client.sendWritableRequest(workerInfo.getTaskId(), request2);
        this.client.waitAllRequests();
        this.client.stop();
        this.server.stop();
        checkResult(2);
    }
}
