package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.class */
public class CreditBasedPartitionRequestClientHandlerTest {

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest$TestRemoteInputChannelForError.class */
    private static class TestRemoteInputChannelForError extends RemoteInputChannel {
        private final String expectedMessage;

        TestRemoteInputChannelForError(SingleInputGate singleInputGate, String str) {
            super(singleInputGate, 0, new ResultPartitionID(), InputChannelBuilder.STUB_CONNECTION_ID, new TestingConnectionManager(), 0, 100, 2, new SimpleCounter(), new SimpleCounter(), ChannelStateWriter.NO_OP);
            this.expectedMessage = str;
        }

        public void onBuffer(Buffer buffer, int i, int i2) throws IOException {
            buffer.recycleBuffer();
            throw new IOException(this.expectedMessage);
        }
    }

    @Test(timeout = 60000)
    public void testReleaseInputChannelDuringDecode() throws Exception {
        BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
        Mockito.when(bufferProvider.requestBuffer()).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(bufferProvider.isDestroyed())).thenReturn(true);
        Mockito.when(Boolean.valueOf(bufferProvider.addBufferListener((BufferListener) Matchers.any(BufferListener.class)))).thenReturn(false);
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.mock(RemoteInputChannel.class);
        Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
        Mockito.when(remoteInputChannel.getBufferProvider()).thenReturn(bufferProvider);
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse(TestBufferFactory.createBuffer(32768), 0, remoteInputChannel.getInputChannelId(), 2, new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler)));
    }

    @Test
    public void testReceiveEmptyBuffer() throws Exception {
        BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
        Mockito.when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.mock(RemoteInputChannel.class);
        Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
        Mockito.when(remoteInputChannel.getBufferProvider()).thenReturn(bufferProvider);
        Buffer createBuffer = TestBufferFactory.createBuffer(0);
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse(createBuffer, 0, remoteInputChannel.getInputChannelId(), 2, new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler)));
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.never())).onError((Throwable) Matchers.any(Throwable.class));
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(1))).onEmptyBuffer(0, 2);
    }

    @Test
    public void testReceiveBuffer() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel buildRemoteChannel = InputChannelBuilder.newBuilder().buildRemoteChannel(createSingleInputGate);
        try {
            createSingleInputGate.setInputChannels(new InputChannel[]{buildRemoteChannel});
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(8, 8));
            createSingleInputGate.setupChannels();
            CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
            creditBasedPartitionRequestClientHandler.addInputChannel(buildRemoteChannel);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse(TestBufferFactory.createBuffer(32), 0, buildRemoteChannel.getInputChannelId(), 2, new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler)));
            Assert.assertEquals(1L, buildRemoteChannel.getNumberOfQueuedBuffers());
            Assert.assertEquals(2L, buildRemoteChannel.getSenderBacklog());
            releaseResource(createSingleInputGate, networkBufferPool);
        } catch (Throwable th) {
            releaseResource(createSingleInputGate, networkBufferPool);
            throw th;
        }
    }

    @Test
    public void testReceiveCompressedBuffer() throws Exception {
        BufferCompressor bufferCompressor = new BufferCompressor(1024, "LZ4");
        BufferDecompressor bufferDecompressor = new BufferDecompressor(1024, "LZ4");
        MemorySegmentProvider networkBufferPool = new NetworkBufferPool(10, 1024);
        SingleInputGate build = new SingleInputGateBuilder().setBufferDecompressor(bufferDecompressor).setSegmentProvider(networkBufferPool).build();
        InputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(build, (PartitionRequestClient) null);
        build.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        try {
            build.setBufferPool(networkBufferPool.createBufferPool(8, 8));
            build.setupChannels();
            CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
            creditBasedPartitionRequestClientHandler.addInputChannel(createRemoteInputChannel);
            NettyMessage.BufferResponse createBufferResponse = createBufferResponse(bufferCompressor.compressToOriginalBuffer(TestBufferFactory.createBuffer(1024)), 0, createRemoteInputChannel.getInputChannelId(), 2, new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler));
            Assert.assertTrue(createBufferResponse.isCompressed);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) null, createBufferResponse);
            Buffer nextReceivedBuffer = createRemoteInputChannel.getNextReceivedBuffer();
            Assert.assertNotNull(nextReceivedBuffer);
            Assert.assertTrue(nextReceivedBuffer.isCompressed());
            nextReceivedBuffer.recycleBuffer();
            releaseResource(build, networkBufferPool);
        } catch (Throwable th) {
            releaseResource(build, networkBufferPool);
            throw th;
        }
    }

    @Test
    public void testThrowExceptionForNoAvailableBuffer() throws Exception {
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.spy(InputChannelBuilder.newBuilder().buildRemoteChannel(InputChannelTestUtils.createSingleInputGate(1)));
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        Assert.assertEquals("There should be no buffers available in the channel.", 0L, remoteInputChannel.getNumberOfAvailableBuffers());
        NettyMessage.BufferResponse createBufferResponse = createBufferResponse(TestBufferFactory.createBuffer(32768), 0, remoteInputChannel.getInputChannelId(), 2, new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler));
        Assert.assertNull(createBufferResponse.getBuffer());
        creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse);
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(1))).onError((Throwable) Matchers.any(IllegalStateException.class));
    }

    @Test
    public void testReceivePartitionNotFoundException() throws Exception {
        BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
        Mockito.when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.mock(RemoteInputChannel.class);
        Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
        Mockito.when(remoteInputChannel.getBufferProvider()).thenReturn(bufferProvider);
        NettyMessage.ErrorResponse errorResponse = new NettyMessage.ErrorResponse(new PartitionNotFoundException(new ResultPartitionID()), remoteInputChannel.getInputChannelId());
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        Mockito.when(channelHandlerContext.channel()).thenReturn(Mockito.mock(Channel.class));
        creditBasedPartitionRequestClientHandler.channelActive(channelHandlerContext);
        creditBasedPartitionRequestClientHandler.channelRead(channelHandlerContext, errorResponse);
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(1))).onFailedPartitionRequest();
    }

    @Test
    public void testCancelBeforeActive() throws Exception {
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.mock(RemoteInputChannel.class);
        Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        creditBasedPartitionRequestClientHandler.cancelRequestFor((InputChannelID) null);
        creditBasedPartitionRequestClientHandler.cancelRequestFor(remoteInputChannel.getInputChannelId());
    }

    @Test
    public void testNotifyCreditAvailable() throws Exception {
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        NetworkBufferAllocator networkBufferAllocator = new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler);
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        NettyPartitionRequestClient nettyPartitionRequestClient = new NettyPartitionRequestClient(embeddedChannel, creditBasedPartitionRequestClientHandler, (ConnectionID) Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory) Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(2, networkBufferPool);
        RemoteInputChannel[] remoteInputChannelArr = {InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) nettyPartitionRequestClient), InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) nettyPartitionRequestClient)};
        try {
            createSingleInputGate.setInputChannels(remoteInputChannelArr);
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.setupChannels();
            remoteInputChannelArr[0].requestSubpartition(0);
            remoteInputChannelArr[1].requestSubpartition(0);
            Assert.assertTrue(embeddedChannel.isWritable());
            Object readOutbound = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound, org.hamcrest.Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(remoteInputChannelArr[0].getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound).credit);
            Object readOutbound2 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound2, org.hamcrest.Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(remoteInputChannelArr[1].getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound2).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound2).credit);
            NettyMessage.BufferResponse createBufferResponse = createBufferResponse(TestBufferFactory.createBuffer(32), 0, remoteInputChannelArr[0].getInputChannelId(), 1, networkBufferAllocator);
            NettyMessage.BufferResponse createBufferResponse2 = createBufferResponse(TestBufferFactory.createBuffer(32), 0, remoteInputChannelArr[1].getInputChannelId(), 1, networkBufferAllocator);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse2);
            Assert.assertEquals(2L, remoteInputChannelArr[0].getUnannouncedCredit());
            Assert.assertEquals(2L, remoteInputChannelArr[1].getUnannouncedCredit());
            embeddedChannel.runPendingTasks();
            Object readOutbound3 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound3, org.hamcrest.Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals(remoteInputChannelArr[0].getInputChannelId(), ((NettyMessage.AddCredit) readOutbound3).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.AddCredit) readOutbound3).credit);
            Object readOutbound4 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound4, org.hamcrest.Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals(remoteInputChannelArr[1].getInputChannelId(), ((NettyMessage.AddCredit) readOutbound4).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.AddCredit) readOutbound4).credit);
            Assert.assertNull(embeddedChannel.readOutbound());
            ByteBuf blockChannel = PartitionRequestQueueTest.blockChannel(embeddedChannel);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse(TestBufferFactory.createBuffer(32), 1, remoteInputChannelArr[0].getInputChannelId(), 1, networkBufferAllocator));
            Assert.assertEquals(1L, remoteInputChannelArr[0].getUnannouncedCredit());
            Assert.assertEquals(0L, remoteInputChannelArr[1].getUnannouncedCredit());
            embeddedChannel.runPendingTasks();
            Assert.assertFalse(embeddedChannel.isWritable());
            Assert.assertNull(embeddedChannel.readOutbound());
            embeddedChannel.flush();
            Assert.assertSame(blockChannel, embeddedChannel.readOutbound());
            Assert.assertTrue(embeddedChannel.isWritable());
            Assert.assertThat(embeddedChannel.readOutbound(), org.hamcrest.Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals(1L, ((NettyMessage.AddCredit) r0).credit);
            Assert.assertEquals(0L, remoteInputChannelArr[0].getUnannouncedCredit());
            Assert.assertEquals(0L, remoteInputChannelArr[1].getUnannouncedCredit());
            Assert.assertNull(embeddedChannel.readOutbound());
            releaseResource(createSingleInputGate, networkBufferPool);
            embeddedChannel.close();
        } catch (Throwable th) {
            releaseResource(createSingleInputGate, networkBufferPool);
            embeddedChannel.close();
            throw th;
        }
    }

    @Test
    public void testNotifyCreditAvailableAfterReleased() throws Exception {
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        NettyPartitionRequestClient nettyPartitionRequestClient = new NettyPartitionRequestClient(embeddedChannel, creditBasedPartitionRequestClientHandler, (ConnectionID) Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory) Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) nettyPartitionRequestClient);
        try {
            createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            Assert.assertThat(embeddedChannel.readOutbound(), org.hamcrest.Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) r0).credit);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse(TestBufferFactory.createBuffer(32), 0, createRemoteInputChannel.getInputChannelId(), 1, new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler)));
            Assert.assertEquals(2L, createRemoteInputChannel.getUnannouncedCredit());
            createSingleInputGate.close();
            Assert.assertThat(embeddedChannel.readOutbound(), org.hamcrest.Matchers.instanceOf(NettyMessage.CloseRequest.class));
            embeddedChannel.runPendingTasks();
            Assert.assertNull(embeddedChannel.readOutbound());
            releaseResource(createSingleInputGate, networkBufferPool);
            embeddedChannel.close();
        } catch (Throwable th) {
            releaseResource(createSingleInputGate, networkBufferPool);
            embeddedChannel.close();
            throw th;
        }
    }

    @Test
    public void testReadBufferResponseBeforeReleasingChannel() throws Exception {
        testReadBufferResponseWithReleasingOrRemovingChannel(false, true);
    }

    @Test
    public void testReadBufferResponseBeforeRemovingChannel() throws Exception {
        testReadBufferResponseWithReleasingOrRemovingChannel(true, true);
    }

    @Test
    public void testReadBufferResponseAfterReleasingChannel() throws Exception {
        testReadBufferResponseWithReleasingOrRemovingChannel(false, false);
    }

    @Test
    public void testReadBufferResponseAfterRemovingChannel() throws Exception {
        testReadBufferResponseWithReleasingOrRemovingChannel(true, false);
    }

    @Test
    public void testDoNotFailHandlerOnSingleChannelFailure() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 1024);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel testRemoteInputChannelForError = new TestRemoteInputChannelForError(createSingleInputGate, "test exception on buffer");
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        try {
            createSingleInputGate.setInputChannels(new InputChannel[]{testRemoteInputChannelForError});
            createSingleInputGate.setupChannels();
            createSingleInputGate.requestPartitions();
            creditBasedPartitionRequestClientHandler.addInputChannel(testRemoteInputChannelForError);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) null, createBufferResponse(TestBufferFactory.createBuffer(1024), 0, testRemoteInputChannelForError.getInputChannelId(), 1, new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler)));
            creditBasedPartitionRequestClientHandler.checkError();
            try {
                createSingleInputGate.getNext();
            } catch (IOException e) {
                Assert.assertEquals("test exception on buffer", e.getMessage());
            }
        } finally {
            releaseResource(createSingleInputGate, networkBufferPool);
        }
    }

    private void testReadBufferResponseWithReleasingOrRemovingChannel(boolean z, boolean z2) throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 1024);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel buildRemoteChannel = new InputChannelBuilder().buildRemoteChannel(createSingleInputGate);
        createSingleInputGate.setInputChannels(new InputChannel[]{buildRemoteChannel});
        createSingleInputGate.setupChannels();
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        creditBasedPartitionRequestClientHandler.addInputChannel(buildRemoteChannel);
        if (!z2) {
            try {
                createSingleInputGate.close();
                if (z) {
                    creditBasedPartitionRequestClientHandler.removeInputChannel(buildRemoteChannel);
                }
            } catch (Throwable th) {
                releaseResource(createSingleInputGate, networkBufferPool);
                embeddedChannel.close();
                throw th;
            }
        }
        NettyMessage.BufferResponse createBufferResponse = createBufferResponse(TestBufferFactory.createBuffer(1024), 0, buildRemoteChannel.getInputChannelId(), 1, new NetworkBufferAllocator(creditBasedPartitionRequestClientHandler));
        if (z2) {
            createSingleInputGate.close();
            if (z) {
                creditBasedPartitionRequestClientHandler.removeInputChannel(buildRemoteChannel);
            }
        }
        creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) null, createBufferResponse);
        Assert.assertEquals(0L, buildRemoteChannel.getNumberOfQueuedBuffers());
        if (z2) {
            Assert.assertNotNull(createBufferResponse.getBuffer());
            Assert.assertTrue(createBufferResponse.getBuffer().isRecycled());
        } else {
            Assert.assertNull(createBufferResponse.getBuffer());
        }
        embeddedChannel.runScheduledPendingTasks();
        NettyMessage.CancelPartitionRequest cancelPartitionRequest = (NettyMessage.CancelPartitionRequest) embeddedChannel.readOutbound();
        Assert.assertNotNull(cancelPartitionRequest);
        Assert.assertEquals(buildRemoteChannel.getInputChannelId(), cancelPartitionRequest.receiverId);
        releaseResource(createSingleInputGate, networkBufferPool);
        embeddedChannel.close();
    }

    private static void releaseResource(SingleInputGate singleInputGate, NetworkBufferPool networkBufferPool) throws IOException {
        singleInputGate.close();
        networkBufferPool.destroyAllBufferPools();
        networkBufferPool.destroy();
    }

    private static NettyMessage.BufferResponse createBufferResponse(Buffer buffer, int i, InputChannelID inputChannelID, int i2, NetworkBufferAllocator networkBufferAllocator) throws IOException {
        ByteBuf write = new NettyMessage.BufferResponse(buffer, i, inputChannelID, i2).write(UnpooledByteBufAllocator.DEFAULT);
        write.readBytes(9);
        return NettyMessage.BufferResponse.readFrom(write, networkBufferAllocator);
    }
}
