package org.apache.flink.runtime.rpc.pekko;

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcActorTest;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/PekkoRpcActorHandshakeTest.class */
class PekkoRpcActorHandshakeTest {
    private static PekkoRpcService rpcService1;
    private static PekkoRpcService rpcService2;
    private static WrongVersionPekkoRpcService wrongVersionRpcService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/PekkoRpcActorHandshakeTest$WrongRpcGateway.class */
    public interface WrongRpcGateway extends RpcGateway {
        CompletableFuture<Boolean> barfoo();

        void tell(String str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/PekkoRpcActorHandshakeTest$WrongVersionPekkoRpcService.class */
    public static class WrongVersionPekkoRpcService extends PekkoRpcService {
        WrongVersionPekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration pekkoRpcServiceConfiguration) {
            super(actorSystem, pekkoRpcServiceConfiguration);
        }

        protected int getVersion() {
            return -1;
        }
    }

    PekkoRpcActorHandshakeTest() {
    }

    @BeforeAll
    static void setupClass() {
        ActorSystem createDefaultActorSystem = PekkoUtils.createDefaultActorSystem();
        ActorSystem createDefaultActorSystem2 = PekkoUtils.createDefaultActorSystem();
        ActorSystem createDefaultActorSystem3 = PekkoUtils.createDefaultActorSystem();
        PekkoRpcServiceConfiguration defaultConfiguration = PekkoRpcServiceConfiguration.defaultConfiguration();
        rpcService1 = new PekkoRpcService(createDefaultActorSystem, defaultConfiguration);
        rpcService2 = new PekkoRpcService(createDefaultActorSystem2, defaultConfiguration);
        wrongVersionRpcService = new WrongVersionPekkoRpcService(createDefaultActorSystem3, PekkoRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterAll
    static void teardownClass() throws Exception {
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(rpcService1.closeAsync());
        arrayList.add(rpcService2.closeAsync());
        arrayList.add(wrongVersionRpcService.closeAsync());
        FutureUtils.waitForAll(arrayList).get();
    }

    @Test
    void testVersionMatchBetweenRpcComponents() throws Exception {
        PekkoRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new PekkoRpcActorTest.DummyRpcEndpoint(rpcService1);
        dummyRpcEndpoint.setFoobar(42);
        dummyRpcEndpoint.start();
        try {
            Assertions.assertThat(((PekkoRpcActorTest.DummyRpcGateway) rpcService2.connect(dummyRpcEndpoint.getAddress(), PekkoRpcActorTest.DummyRpcGateway.class).get()).foobar().get()).isEqualTo(42);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
            throw th;
        }
    }

    @Test
    void testVersionMismatchBetweenRpcComponents() throws Exception {
        PekkoRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new PekkoRpcActorTest.DummyRpcEndpoint(rpcService1);
        dummyRpcEndpoint.start();
        try {
            Assertions.assertThatThrownBy(() -> {
            }).extracting(ExceptionUtils::stripExecutionException).isInstanceOf(HandshakeException.class);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
            throw th;
        }
    }

    @Test
    void testWrongGatewayEndpointConnection() throws Exception {
        PekkoRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new PekkoRpcActorTest.DummyRpcEndpoint(rpcService1);
        dummyRpcEndpoint.start();
        CompletableFuture connect = rpcService2.connect(dummyRpcEndpoint.getAddress(), WrongRpcGateway.class);
        try {
            Assertions.assertThatThrownBy(() -> {
            }).extracting(ExceptionUtils::stripExecutionException).isInstanceOf(HandshakeException.class);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
            throw th;
        }
    }
}
