package org.apache.spark.network.shuffle;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.UploadBlock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.class */
public class ExternalShuffleBlockHandlerSuite {
    TransportClient client = (TransportClient) Mockito.mock(TransportClient.class);
    OneForOneStreamManager streamManager;
    ExternalShuffleBlockResolver blockResolver;
    RpcHandler handler;

    @Before
    public void beforeEach() {
        this.streamManager = (OneForOneStreamManager) Mockito.mock(OneForOneStreamManager.class);
        this.blockResolver = (ExternalShuffleBlockResolver) Mockito.mock(ExternalShuffleBlockResolver.class);
        this.handler = new ExternalShuffleBlockHandler(this.streamManager, this.blockResolver);
    }

    @Test
    public void testRegisterExecutor() {
        RpcResponseCallback rpcResponseCallback = (RpcResponseCallback) Mockito.mock(RpcResponseCallback.class);
        ExecutorShuffleInfo executorShuffleInfo = new ExecutorShuffleInfo(new String[]{"/a", "/b"}, 16, "sort");
        this.handler.receive(this.client, new RegisterExecutor("app0", "exec1", executorShuffleInfo).toByteBuffer(), rpcResponseCallback);
        ((ExternalShuffleBlockResolver) Mockito.verify(this.blockResolver, Mockito.times(1))).registerExecutor("app0", "exec1", executorShuffleInfo);
        ((RpcResponseCallback) Mockito.verify(rpcResponseCallback, Mockito.times(1))).onSuccess((ByteBuffer) Matchers.any(ByteBuffer.class));
        ((RpcResponseCallback) Mockito.verify(rpcResponseCallback, Mockito.never())).onFailure((Throwable) Matchers.any(Throwable.class));
        Assert.assertEquals(1L, ((Timer) this.handler.getAllMetrics().getMetrics().get("registerExecutorRequestLatencyMillis")).getCount());
    }

    @Test
    public void testOpenShuffleBlocks() {
        RpcResponseCallback rpcResponseCallback = (RpcResponseCallback) Mockito.mock(RpcResponseCallback.class);
        NioManagedBuffer nioManagedBuffer = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
        NioManagedBuffer nioManagedBuffer2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
        Mockito.when(this.blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(nioManagedBuffer);
        Mockito.when(this.blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(nioManagedBuffer2);
        this.handler.receive(this.client, new OpenBlocks("app0", "exec1", new String[]{"b0", "b1"}).toByteBuffer(), rpcResponseCallback);
        ((RpcResponseCallback) Mockito.verify(rpcResponseCallback, Mockito.times(1))).onSuccess((ByteBuffer) ArgumentCaptor.forClass(ByteBuffer.class).capture());
        ((RpcResponseCallback) Mockito.verify(rpcResponseCallback, Mockito.never())).onFailure((Throwable) Matchers.any());
        Assert.assertEquals(2L, BlockTransferMessage.Decoder.fromByteBuffer((ByteBuffer) r0.getValue()).numChunks);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Iterator.class);
        ((OneForOneStreamManager) Mockito.verify(this.streamManager, Mockito.times(1))).registerStream(Mockito.anyString(), (Iterator) forClass.capture());
        Iterator it = (Iterator) forClass.getValue();
        Assert.assertEquals(nioManagedBuffer, it.next());
        Assert.assertEquals(nioManagedBuffer2, it.next());
        Assert.assertFalse(it.hasNext());
        ((ExternalShuffleBlockResolver) Mockito.verify(this.blockResolver, Mockito.times(1))).getBlockData("app0", "exec1", "b0");
        ((ExternalShuffleBlockResolver) Mockito.verify(this.blockResolver, Mockito.times(1))).getBlockData("app0", "exec1", "b1");
        Assert.assertEquals(1L, ((Timer) this.handler.getAllMetrics().getMetrics().get("openBlockRequestLatencyMillis")).getCount());
        Assert.assertEquals(10L, ((Meter) this.handler.getAllMetrics().getMetrics().get("blockTransferRateBytes")).getCount());
    }

    @Test
    public void testBadMessages() {
        RpcResponseCallback rpcResponseCallback = (RpcResponseCallback) Mockito.mock(RpcResponseCallback.class);
        try {
            this.handler.receive(this.client, ByteBuffer.wrap(new byte[]{18, 52, 86}), rpcResponseCallback);
            Assert.fail("Should have thrown");
        } catch (Exception e) {
        }
        try {
            this.handler.receive(this.client, new UploadBlock("a", "e", "b", new byte[1], new byte[2]).toByteBuffer(), rpcResponseCallback);
            Assert.fail("Should have thrown");
        } catch (UnsupportedOperationException e2) {
        }
        ((RpcResponseCallback) Mockito.verify(rpcResponseCallback, Mockito.never())).onSuccess((ByteBuffer) Matchers.any(ByteBuffer.class));
        ((RpcResponseCallback) Mockito.verify(rpcResponseCallback, Mockito.never())).onFailure((Throwable) Matchers.any(Throwable.class));
    }
}
