package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.class */
public class ClientTransportErrorHandlingTest {
    @Test
    public void testExceptionOnWrite() throws Exception {
        NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(new NettyProtocol((ResultPartitionProvider) Mockito.mock(ResultPartitionProvider.class), (TaskEventPublisher) Mockito.mock(TaskEventDispatcher.class)) { // from class: org.apache.flink.runtime.io.network.netty.ClientTransportErrorHandlingTest.1
            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[0];
            }
        }, NettyTestUtil.createConfig());
        Channel connect = NettyTestUtil.connect(initServerAndClient);
        NetworkClientHandler clientHandler = getClientHandler(connect);
        connect.pipeline().addFirst(new ChannelHandler[]{new ChannelOutboundHandlerAdapter() { // from class: org.apache.flink.runtime.io.network.netty.ClientTransportErrorHandlingTest.2
            int writeNum = 0;

            public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
                if (this.writeNum >= 1) {
                    throw new RuntimeException("Expected test exception.");
                }
                this.writeNum++;
                channelHandlerContext.write(obj, channelPromise);
            }
        }});
        NettyPartitionRequestClient nettyPartitionRequestClient = new NettyPartitionRequestClient(connect, clientHandler, (ConnectionID) Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory) Mockito.mock(PartitionRequestClientFactory.class));
        RemoteInputChannel[] remoteInputChannelArr = {createRemoteInputChannel(), createRemoteInputChannel()};
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ((RemoteInputChannel) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.io.network.netty.ClientTransportErrorHandlingTest.3
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m118answer(InvocationOnMock invocationOnMock) throws Throwable {
                countDownLatch.countDown();
                return null;
            }
        }).when(remoteInputChannelArr[1])).onError((Throwable) Matchers.isA(LocalTransportException.class));
        nettyPartitionRequestClient.requestSubpartition(new ResultPartitionID(), 0, remoteInputChannelArr[0], 0);
        nettyPartitionRequestClient.requestSubpartition(new ResultPartitionID(), 0, remoteInputChannelArr[1], 0);
        if (!countDownLatch.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
            Assert.fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about the channel error.");
        }
        ((RemoteInputChannel) Mockito.verify(remoteInputChannelArr[0], Mockito.times(0))).onError((Throwable) Matchers.any(LocalTransportException.class));
        NettyTestUtil.shutdown(initServerAndClient);
    }

    @Test
    public void testWrappingOfRemoteErrorMessage() throws Exception {
        EmbeddedChannel createEmbeddedChannel = createEmbeddedChannel();
        NetworkClientHandler clientHandler = getClientHandler(createEmbeddedChannel);
        RemoteInputChannel[] remoteInputChannelArr = {createRemoteInputChannel(), createRemoteInputChannel()};
        for (RemoteInputChannel remoteInputChannel : remoteInputChannelArr) {
            Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
            clientHandler.addInputChannel(remoteInputChannel);
        }
        createEmbeddedChannel.pipeline().fireChannelRead(new NettyMessage.ErrorResponse(new RuntimeException("Expected test exception"), remoteInputChannelArr[0].getInputChannelId()));
        try {
            createEmbeddedChannel.checkException();
        } catch (Exception e) {
            Assert.fail("The exception reached the end of the pipeline and was not handled correctly by the last handler.");
        }
        ((RemoteInputChannel) Mockito.verify(remoteInputChannelArr[0], Mockito.times(1))).onError((Throwable) Matchers.isA(RemoteTransportException.class));
        ((RemoteInputChannel) Mockito.verify(remoteInputChannelArr[1], Mockito.never())).onError((Throwable) Matchers.any(Throwable.class));
        createEmbeddedChannel.pipeline().fireChannelRead(new NettyMessage.ErrorResponse(new RuntimeException("Expected test exception")));
        try {
            createEmbeddedChannel.checkException();
        } catch (Exception e2) {
            Assert.fail("The exception reached the end of the pipeline and was not handled correctly by the last handler.");
        }
        ((RemoteInputChannel) Mockito.verify(remoteInputChannelArr[0], Mockito.times(2))).onError((Throwable) Matchers.isA(RemoteTransportException.class));
        ((RemoteInputChannel) Mockito.verify(remoteInputChannelArr[1], Mockito.times(1))).onError((Throwable) Matchers.isA(RemoteTransportException.class));
    }

    @Test
    public void testExceptionOnRemoteClose() throws Exception {
        NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(new NettyProtocol((ResultPartitionProvider) Mockito.mock(ResultPartitionProvider.class), (TaskEventPublisher) Mockito.mock(TaskEventDispatcher.class)) { // from class: org.apache.flink.runtime.io.network.netty.ClientTransportErrorHandlingTest.4
            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: org.apache.flink.runtime.io.network.netty.ClientTransportErrorHandlingTest.4.1
                    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                        channelHandlerContext.channel().close();
                    }
                }};
            }
        }, NettyTestUtil.createConfig());
        Channel connect = NettyTestUtil.connect(initServerAndClient);
        NetworkClientHandler clientHandler = getClientHandler(connect);
        RemoteInputChannel[] remoteInputChannelArr = {createRemoteInputChannel(), createRemoteInputChannel()};
        final CountDownLatch countDownLatch = new CountDownLatch(remoteInputChannelArr.length);
        Answer<Void> answer = new Answer<Void>() { // from class: org.apache.flink.runtime.io.network.netty.ClientTransportErrorHandlingTest.5
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m119answer(InvocationOnMock invocationOnMock) throws Throwable {
                countDownLatch.countDown();
                return null;
            }
        };
        for (RemoteInputChannel remoteInputChannel : remoteInputChannelArr) {
            ((RemoteInputChannel) Mockito.doAnswer(answer).when(remoteInputChannel)).onError((Throwable) Matchers.any(Throwable.class));
            clientHandler.addInputChannel(remoteInputChannel);
        }
        connect.writeAndFlush(Unpooled.buffer().writerIndex(16));
        if (!countDownLatch.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
            Assert.fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about remote connection close.");
        }
        for (RemoteInputChannel remoteInputChannel2 : remoteInputChannelArr) {
            ((RemoteInputChannel) Mockito.verify(remoteInputChannel2)).onError((Throwable) Matchers.isA(RemoteTransportException.class));
        }
        NettyTestUtil.shutdown(initServerAndClient);
    }

    @Test
    public void testExceptionCaught() throws Exception {
        EmbeddedChannel createEmbeddedChannel = createEmbeddedChannel();
        NetworkClientHandler clientHandler = getClientHandler(createEmbeddedChannel);
        RemoteInputChannel[] remoteInputChannelArr = {createRemoteInputChannel(), createRemoteInputChannel()};
        for (RemoteInputChannel remoteInputChannel : remoteInputChannelArr) {
            Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
            clientHandler.addInputChannel(remoteInputChannel);
        }
        createEmbeddedChannel.pipeline().fireExceptionCaught(new Exception());
        try {
            createEmbeddedChannel.checkException();
        } catch (Exception e) {
            Assert.fail("The exception reached the end of the pipeline and was not handled correctly by the last handler.");
        }
        for (RemoteInputChannel remoteInputChannel2 : remoteInputChannelArr) {
            ((RemoteInputChannel) Mockito.verify(remoteInputChannel2)).onError((Throwable) Matchers.isA(LocalTransportException.class));
        }
    }

    @Test
    public void testConnectionResetByPeer() throws Throwable {
        EmbeddedChannel createEmbeddedChannel = createEmbeddedChannel();
        RemoteInputChannel addInputChannel = addInputChannel(getClientHandler(createEmbeddedChannel));
        final Throwable[] thArr = new Throwable[1];
        ((RemoteInputChannel) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.io.network.netty.ClientTransportErrorHandlingTest.6
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m120answer(InvocationOnMock invocationOnMock) throws Throwable {
                Throwable th = (Throwable) invocationOnMock.getArguments()[0];
                try {
                    Assert.assertEquals(RemoteTransportException.class, th.getClass());
                    Assert.assertNotEquals("Connection reset by peer", th.getMessage());
                    Assert.assertEquals(IOException.class, th.getCause().getClass());
                    Assert.assertEquals("Connection reset by peer", th.getCause().getMessage());
                    return null;
                } catch (Throwable th2) {
                    thArr[0] = th2;
                    return null;
                }
            }
        }).when(addInputChannel)).onError((Throwable) Matchers.any(Throwable.class));
        createEmbeddedChannel.pipeline().fireExceptionCaught(new IOException("Connection reset by peer"));
        Assert.assertNull(thArr[0]);
    }

    @Test
    public void testChannelClosedOnExceptionDuringErrorNotification() throws Exception {
        EmbeddedChannel createEmbeddedChannel = createEmbeddedChannel();
        ((RemoteInputChannel) Mockito.doThrow(new Throwable[]{new RuntimeException("Expected test exception")}).when(addInputChannel(getClientHandler(createEmbeddedChannel)))).onError((Throwable) Matchers.any(Throwable.class));
        createEmbeddedChannel.pipeline().fireExceptionCaught(new Exception());
        Assert.assertFalse(createEmbeddedChannel.isActive());
    }

    private EmbeddedChannel createEmbeddedChannel() {
        return new EmbeddedChannel(new NettyProtocol((ResultPartitionProvider) Mockito.mock(ResultPartitionProvider.class), (TaskEventPublisher) Mockito.mock(TaskEventDispatcher.class)).getClientChannelHandlers());
    }

    private RemoteInputChannel addInputChannel(NetworkClientHandler networkClientHandler) throws IOException {
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel();
        networkClientHandler.addInputChannel(createRemoteInputChannel);
        return createRemoteInputChannel;
    }

    private NetworkClientHandler getClientHandler(Channel channel) {
        return channel.pipeline().get(NetworkClientHandler.class);
    }

    private RemoteInputChannel createRemoteInputChannel() {
        return (RemoteInputChannel) Mockito.when(((RemoteInputChannel) Mockito.mock(RemoteInputChannel.class)).getInputChannelId()).thenReturn(new InputChannelID()).getMock();
    }
}
