package org.apache.flink.runtime.rpc.akka;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.class */
public class AkkaRpcActorOversizedResponseMessageTest extends TestLogger {
    private static final Time TIMEOUT = Time.seconds(10);
    private static final int FRAMESIZE = 32000;
    private static final String OVERSIZED_PAYLOAD = new String(new byte[FRAMESIZE]);
    private static final String PAYLOAD = "Hello";
    private static RpcService rpcService1;
    private static RpcService rpcService2;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest$MessageRpcEndpoint.class */
    public static class MessageRpcEndpoint extends RpcEndpoint implements MessageRpcGateway {

        @Nonnull
        private final String message;

        MessageRpcEndpoint(RpcService rpcService, @Nonnull String str) {
            super(rpcService);
            this.message = str;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorOversizedResponseMessageTest.MessageRpcGateway
        public CompletableFuture<String> messageAsync() {
            return CompletableFuture.completedFuture(this.message);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorOversizedResponseMessageTest.MessageRpcGateway
        public String messageSync() throws RpcException {
            return this.message;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest$MessageRpcGateway.class */
    public interface MessageRpcGateway extends RpcGateway {
        CompletableFuture<String> messageAsync();

        String messageSync() throws RpcException;
    }

    @BeforeClass
    public static void setupClass() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(AkkaOptions.FRAMESIZE, "32000 b");
        rpcService1 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
        rpcService2 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        RpcUtils.terminateRpcServices(TIMEOUT, new RpcService[]{rpcService1, rpcService2});
    }

    @Test
    public void testOverSizedResponseMsgAsync() throws Exception {
        try {
            runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
            Assert.fail("Expected the RPC to fail.");
        } catch (ExecutionException e) {
            Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, AkkaRpcException.class).isPresent()), Matchers.is(true));
        }
    }

    @Test
    public void testNormalSizedResponseMsgAsync() throws Exception {
        Assert.assertThat((String) runRemoteMessageResponseTest(PAYLOAD, this::requestMessageAsync), Matchers.is(Matchers.equalTo(PAYLOAD)));
    }

    @Test
    public void testNormalSizedResponseMsgSync() throws Exception {
        Assert.assertThat((String) runRemoteMessageResponseTest(PAYLOAD, (v0) -> {
            return v0.messageSync();
        }), Matchers.is(Matchers.equalTo(PAYLOAD)));
    }

    @Test
    public void testOverSizedResponseMsgSync() throws Exception {
        try {
            runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, (v0) -> {
                return v0.messageSync();
            });
            Assert.fail("Expected the RPC to fail.");
        } catch (RpcException e) {
            Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, AkkaRpcException.class).isPresent()), Matchers.is(true));
        }
    }

    @Test
    public void testLocalOverSizedResponseMsgSync() throws Exception {
        Assert.assertThat((String) runLocalMessageResponseTest(OVERSIZED_PAYLOAD, (v0) -> {
            return v0.messageSync();
        }), Matchers.is(Matchers.equalTo(OVERSIZED_PAYLOAD)));
    }

    @Test
    public void testLocalOverSizedResponseMsgAsync() throws Exception {
        Assert.assertThat((String) runLocalMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync), Matchers.is(Matchers.equalTo(OVERSIZED_PAYLOAD)));
    }

    private String requestMessageAsync(MessageRpcGateway messageRpcGateway) throws Exception {
        return messageRpcGateway.messageAsync().get();
    }

    private <T> T runRemoteMessageResponseTest(String str, FunctionWithException<MessageRpcGateway, T, Exception> functionWithException) throws Exception {
        MessageRpcEndpoint messageRpcEndpoint = new MessageRpcEndpoint(rpcService1, str);
        try {
            messageRpcEndpoint.start();
            T t = (T) functionWithException.apply((MessageRpcGateway) rpcService2.connect(messageRpcEndpoint.getAddress(), MessageRpcGateway.class).get());
            RpcUtils.terminateRpcEndpoint(messageRpcEndpoint, TIMEOUT);
            return t;
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(messageRpcEndpoint, TIMEOUT);
            throw th;
        }
    }

    private <T> T runLocalMessageResponseTest(String str, FunctionWithException<MessageRpcGateway, T, Exception> functionWithException) throws Exception {
        MessageRpcEndpoint messageRpcEndpoint = new MessageRpcEndpoint(rpcService1, str);
        try {
            messageRpcEndpoint.start();
            T t = (T) functionWithException.apply((MessageRpcGateway) rpcService1.connect(messageRpcEndpoint.getAddress(), MessageRpcGateway.class).get());
            RpcUtils.terminateRpcEndpoint(messageRpcEndpoint, TIMEOUT);
            return t;
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(messageRpcEndpoint, TIMEOUT);
            throw th;
        }
    }
}
