/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.network.ExtendedChannelPromise;
import org.apache.spark.network.TestManagedBuffer;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.server.ChunkFetchRequestHandler;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ChunkFetchRequestHandlerSuite {
    @Test
    public void handleChunkFetchRequest() throws Exception {
        NoOpRpcHandler rpcHandler = new NoOpRpcHandler();
        OneForOneStreamManager streamManager = (OneForOneStreamManager)rpcHandler.getStreamManager();
        Channel channel = (Channel)Mockito.mock(Channel.class);
        ChannelHandlerContext context = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        Mockito.when((Object)context.channel()).thenAnswer(invocationOnMock0 -> channel);
        ArrayList responseAndPromisePairs = new ArrayList();
        Mockito.when((Object)channel.writeAndFlush(Mockito.any())).thenAnswer(invocationOnMock0 -> {
            Object response = invocationOnMock0.getArguments()[0];
            ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel);
            responseAndPromisePairs.add(ImmutablePair.of((Object)response, (Object)((Object)channelFuture)));
            return channelFuture;
        });
        ArrayList<TestManagedBuffer> managedBuffers = new ArrayList<TestManagedBuffer>();
        managedBuffers.add(new TestManagedBuffer(10));
        managedBuffers.add(new TestManagedBuffer(20));
        managedBuffers.add(null);
        managedBuffers.add(new TestManagedBuffer(30));
        managedBuffers.add(new TestManagedBuffer(40));
        long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel);
        TransportClient reverseClient = (TransportClient)Mockito.mock(TransportClient.class);
        ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient, rpcHandler.getStreamManager(), Long.valueOf(2L), false);
        ChunkFetchRequest request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0));
        requestHandler.channelRead(context, (Object)request0);
        Assert.assertEquals((long)1L, (long)responseAndPromisePairs.size());
        Assert.assertTrue((boolean)(((Pair)responseAndPromisePairs.get(0)).getLeft() instanceof ChunkFetchSuccess));
        Assert.assertEquals(managedBuffers.get(0), (Object)((ChunkFetchSuccess)((Pair)responseAndPromisePairs.get(0)).getLeft()).body());
        ChunkFetchRequest request1 = new ChunkFetchRequest(new StreamChunkId(streamId, 1));
        requestHandler.channelRead(context, (Object)request1);
        Assert.assertEquals((long)2L, (long)responseAndPromisePairs.size());
        Assert.assertTrue((boolean)(((Pair)responseAndPromisePairs.get(1)).getLeft() instanceof ChunkFetchSuccess));
        Assert.assertEquals(managedBuffers.get(1), (Object)((ChunkFetchSuccess)((Pair)responseAndPromisePairs.get(1)).getLeft()).body());
        ((ExtendedChannelPromise)((Object)((Pair)responseAndPromisePairs.get(0)).getRight())).finish(true);
        ChunkFetchRequest request2 = new ChunkFetchRequest(new StreamChunkId(streamId, 2));
        requestHandler.channelRead(context, (Object)request2);
        Assert.assertEquals((long)3L, (long)responseAndPromisePairs.size());
        Assert.assertTrue((boolean)(((Pair)responseAndPromisePairs.get(2)).getLeft() instanceof ChunkFetchFailure));
        ChunkFetchFailure chunkFetchFailure = (ChunkFetchFailure)((Pair)responseAndPromisePairs.get(2)).getLeft();
        Assert.assertEquals((Object)"java.lang.IllegalStateException: Chunk was not found", (Object)chunkFetchFailure.errorString.split("\\r?\\n")[0]);
        ChunkFetchRequest request3 = new ChunkFetchRequest(new StreamChunkId(streamId, 3));
        requestHandler.channelRead(context, (Object)request3);
        Assert.assertEquals((long)4L, (long)responseAndPromisePairs.size());
        Assert.assertTrue((boolean)(((Pair)responseAndPromisePairs.get(3)).getLeft() instanceof ChunkFetchSuccess));
        Assert.assertEquals(managedBuffers.get(3), (Object)((ChunkFetchSuccess)((Pair)responseAndPromisePairs.get(3)).getLeft()).body());
        ChunkFetchRequest request4 = new ChunkFetchRequest(new StreamChunkId(streamId, 4));
        requestHandler.channelRead(context, (Object)request4);
        ((Channel)Mockito.verify((Object)channel, (VerificationMode)Mockito.times((int)1))).close();
        Assert.assertEquals((long)4L, (long)responseAndPromisePairs.size());
    }
}

