package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest;
import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.class */
class AkkaRpcActorHandshakeTest {
    private static AkkaRpcService akkaRpcService1;
    private static AkkaRpcService akkaRpcService2;
    private static WrongVersionAkkaRpcService wrongVersionAkkaRpcService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest$WrongRpcGateway.class */
    public interface WrongRpcGateway extends RpcGateway {
        CompletableFuture<Boolean> barfoo();

        void tell(String str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest$WrongVersionAkkaRpcService.class */
    public static class WrongVersionAkkaRpcService extends AkkaRpcService {
        WrongVersionAkkaRpcService(ActorSystem actorSystem, AkkaRpcServiceConfiguration akkaRpcServiceConfiguration) {
            super(actorSystem, akkaRpcServiceConfiguration);
        }

        protected int getVersion() {
            return -1;
        }
    }

    AkkaRpcActorHandshakeTest() {
    }

    @BeforeAll
    static void setupClass() {
        ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
        ActorSystem createDefaultActorSystem2 = AkkaUtils.createDefaultActorSystem();
        ActorSystem createDefaultActorSystem3 = AkkaUtils.createDefaultActorSystem();
        AkkaRpcServiceConfiguration defaultConfiguration = AkkaRpcServiceConfiguration.defaultConfiguration();
        akkaRpcService1 = new AkkaRpcService(createDefaultActorSystem, defaultConfiguration);
        akkaRpcService2 = new AkkaRpcService(createDefaultActorSystem2, defaultConfiguration);
        wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(createDefaultActorSystem3, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterAll
    static void teardownClass() throws Exception {
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(akkaRpcService1.closeAsync());
        arrayList.add(akkaRpcService2.closeAsync());
        arrayList.add(wrongVersionAkkaRpcService.closeAsync());
        FutureUtils.waitForAll(arrayList).get();
    }

    @Test
    void testVersionMatchBetweenRpcComponents() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
        dummyRpcEndpoint.setFoobar(42);
        dummyRpcEndpoint.start();
        try {
            Assertions.assertThat(((AkkaRpcActorTest.DummyRpcGateway) akkaRpcService2.connect(dummyRpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get()).foobar().get()).isEqualTo(42);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
            throw th;
        }
    }

    @Test
    void testVersionMismatchBetweenRpcComponents() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
        dummyRpcEndpoint.start();
        try {
            Assertions.assertThatThrownBy(() -> {
            }).extracting(ExceptionUtils::stripExecutionException).isInstanceOf(HandshakeException.class);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
            throw th;
        }
    }

    @Test
    void testWrongGatewayEndpointConnection() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
        dummyRpcEndpoint.start();
        CompletableFuture connect = akkaRpcService2.connect(dummyRpcEndpoint.getAddress(), WrongRpcGateway.class);
        try {
            Assertions.assertThatThrownBy(() -> {
            }).extracting(ExceptionUtils::stripExecutionException).isInstanceOf(HandshakeException.class);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
            throw th;
        }
    }
}
