/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.local.LocalChannel;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.spark.network.TestManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.BaseResponseCallback;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.MergedBlockMetaResponseCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallback;
import org.apache.spark.network.client.TransportResponseHandler;
import org.apache.spark.network.protocol.ChunkFetchFailure;
import org.apache.spark.network.protocol.ChunkFetchSuccess;
import org.apache.spark.network.protocol.MergedBlockMetaSuccess;
import org.apache.spark.network.protocol.ResponseMessage;
import org.apache.spark.network.protocol.RpcFailure;
import org.apache.spark.network.protocol.RpcResponse;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamFailure;
import org.apache.spark.network.protocol.StreamResponse;
import org.apache.spark.network.util.TransportFrameDecoder;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TransportResponseHandlerSuite {
    @Test
    public void handleSuccessfulFetch() throws Exception {
        StreamChunkId streamChunkId = new StreamChunkId(1L, 0);
        TransportResponseHandler handler = new TransportResponseHandler((Channel)new LocalChannel());
        ChunkReceivedCallback callback = (ChunkReceivedCallback)Mockito.mock(ChunkReceivedCallback.class);
        handler.addFetchRequest(streamChunkId, callback);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new ChunkFetchSuccess(streamChunkId, (ManagedBuffer)new TestManagedBuffer(123)));
        ((ChunkReceivedCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onSuccess(Mockito.eq((int)0), (ManagedBuffer)Mockito.any());
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
    }

    @Test
    public void handleFailedFetch() throws Exception {
        StreamChunkId streamChunkId = new StreamChunkId(1L, 0);
        TransportResponseHandler handler = new TransportResponseHandler((Channel)new LocalChannel());
        ChunkReceivedCallback callback = (ChunkReceivedCallback)Mockito.mock(ChunkReceivedCallback.class);
        handler.addFetchRequest(streamChunkId, callback);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new ChunkFetchFailure(streamChunkId, "some error msg"));
        ((ChunkReceivedCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onFailure(Mockito.eq((int)0), (Throwable)Mockito.any());
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
    }

    @Test
    public void clearAllOutstandingRequests() throws Exception {
        TransportResponseHandler handler = new TransportResponseHandler((Channel)new LocalChannel());
        ChunkReceivedCallback callback = (ChunkReceivedCallback)Mockito.mock(ChunkReceivedCallback.class);
        handler.addFetchRequest(new StreamChunkId(1L, 0), callback);
        handler.addFetchRequest(new StreamChunkId(1L, 1), callback);
        handler.addFetchRequest(new StreamChunkId(1L, 2), callback);
        Assert.assertEquals((long)3L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new ChunkFetchSuccess(new StreamChunkId(1L, 0), (ManagedBuffer)new TestManagedBuffer(12)));
        handler.exceptionCaught((Throwable)new Exception("duh duh duhhhh"));
        ((ChunkReceivedCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onSuccess(Mockito.eq((int)0), (ManagedBuffer)Mockito.any());
        ((ChunkReceivedCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onFailure(Mockito.eq((int)1), (Throwable)Mockito.any());
        ((ChunkReceivedCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onFailure(Mockito.eq((int)2), (Throwable)Mockito.any());
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
    }

    @Test
    public void handleSuccessfulRPC() throws Exception {
        TransportResponseHandler handler = new TransportResponseHandler((Channel)new LocalChannel());
        RpcResponseCallback callback = (RpcResponseCallback)Mockito.mock(RpcResponseCallback.class);
        handler.addRpcRequest(12345L, (BaseResponseCallback)callback);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new RpcResponse(54321L, (ManagedBuffer)new NioManagedBuffer(ByteBuffer.allocate(7))));
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        ByteBuffer resp = ByteBuffer.allocate(10);
        handler.handle((ResponseMessage)new RpcResponse(12345L, (ManagedBuffer)new NioManagedBuffer(resp)));
        ((RpcResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onSuccess((ByteBuffer)Mockito.eq((Object)ByteBuffer.allocate(10)));
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
    }

    @Test
    public void handleFailedRPC() throws Exception {
        TransportResponseHandler handler = new TransportResponseHandler((Channel)new LocalChannel());
        RpcResponseCallback callback = (RpcResponseCallback)Mockito.mock(RpcResponseCallback.class);
        handler.addRpcRequest(12345L, (BaseResponseCallback)callback);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new RpcFailure(54321L, "uh-oh!"));
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new RpcFailure(12345L, "oh no"));
        ((RpcResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onFailure((Throwable)Mockito.any());
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
    }

    @Test
    public void testActiveStreams() throws Exception {
        LocalChannel c = new LocalChannel();
        c.pipeline().addLast("frameDecoder", (ChannelHandler)new TransportFrameDecoder());
        TransportResponseHandler handler = new TransportResponseHandler((Channel)c);
        StreamResponse response = new StreamResponse("stream", 1234L, null);
        StreamCallback cb = (StreamCallback)Mockito.mock(StreamCallback.class);
        handler.addStreamCallback("stream", cb);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)response);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.deactivateStream();
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
        StreamFailure failure = new StreamFailure("stream", "uh-oh");
        handler.addStreamCallback("stream", cb);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)failure);
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
    }

    @Test
    public void failOutstandingStreamCallbackOnClose() throws Exception {
        LocalChannel c = new LocalChannel();
        c.pipeline().addLast("frameDecoder", (ChannelHandler)new TransportFrameDecoder());
        TransportResponseHandler handler = new TransportResponseHandler((Channel)c);
        StreamCallback cb = (StreamCallback)Mockito.mock(StreamCallback.class);
        handler.addStreamCallback("stream-1", cb);
        handler.channelInactive();
        ((StreamCallback)Mockito.verify((Object)cb)).onFailure((String)Mockito.eq((Object)"stream-1"), (Throwable)Mockito.isA(IOException.class));
    }

    @Test
    public void failOutstandingStreamCallbackOnException() throws Exception {
        LocalChannel c = new LocalChannel();
        c.pipeline().addLast("frameDecoder", (ChannelHandler)new TransportFrameDecoder());
        TransportResponseHandler handler = new TransportResponseHandler((Channel)c);
        StreamCallback cb = (StreamCallback)Mockito.mock(StreamCallback.class);
        handler.addStreamCallback("stream-1", cb);
        handler.exceptionCaught((Throwable)new IOException("Oops!"));
        ((StreamCallback)Mockito.verify((Object)cb)).onFailure((String)Mockito.eq((Object)"stream-1"), (Throwable)Mockito.isA(IOException.class));
    }

    @Test
    public void handleSuccessfulMergedBlockMeta() throws Exception {
        TransportResponseHandler handler = new TransportResponseHandler((Channel)new LocalChannel());
        MergedBlockMetaResponseCallback callback = (MergedBlockMetaResponseCallback)Mockito.mock(MergedBlockMetaResponseCallback.class);
        handler.addRpcRequest(13L, (BaseResponseCallback)callback);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new MergedBlockMetaSuccess(22L, 2, (ManagedBuffer)new NioManagedBuffer(ByteBuffer.allocate(7))));
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        ByteBuffer resp = ByteBuffer.allocate(10);
        handler.handle((ResponseMessage)new MergedBlockMetaSuccess(13L, 2, (ManagedBuffer)new NioManagedBuffer(resp)));
        ArgumentCaptor bufferCaptor = ArgumentCaptor.forClass(NioManagedBuffer.class);
        ((MergedBlockMetaResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onSuccess(Mockito.eq((int)2), (ManagedBuffer)bufferCaptor.capture());
        Assert.assertEquals((Object)resp, (Object)((NioManagedBuffer)bufferCaptor.getValue()).nioByteBuffer());
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
    }

    @Test
    public void handleFailedMergedBlockMeta() throws Exception {
        TransportResponseHandler handler = new TransportResponseHandler((Channel)new LocalChannel());
        MergedBlockMetaResponseCallback callback = (MergedBlockMetaResponseCallback)Mockito.mock(MergedBlockMetaResponseCallback.class);
        handler.addRpcRequest(51L, (BaseResponseCallback)callback);
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new RpcFailure(6L, "failed"));
        Assert.assertEquals((long)1L, (long)handler.numOutstandingRequests());
        handler.handle((ResponseMessage)new RpcFailure(51L, "failed"));
        ((MergedBlockMetaResponseCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).onFailure((Throwable)Mockito.any());
        Assert.assertEquals((long)0L, (long)handler.numOutstandingRequests());
    }
}

