package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest;
import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.class */
public class AkkaRpcActorHandshakeTest extends TestLogger {
    private static final Time timeout = Time.seconds(10);
    private static AkkaRpcService akkaRpcService1;
    private static AkkaRpcService akkaRpcService2;
    private static WrongVersionAkkaRpcService wrongVersionAkkaRpcService;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest$WrongRpcGateway.class */
    private interface WrongRpcGateway extends RpcGateway {
        CompletableFuture<Boolean> barfoo();

        void tell(String str);
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest$WrongVersionAkkaRpcService.class */
    private static class WrongVersionAkkaRpcService extends AkkaRpcService {
        WrongVersionAkkaRpcService(ActorSystem actorSystem, AkkaRpcServiceConfiguration akkaRpcServiceConfiguration) {
            super(actorSystem, akkaRpcServiceConfiguration);
        }

        protected int getVersion() {
            return -1;
        }
    }

    @BeforeClass
    public static void setupClass() {
        ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
        ActorSystem createDefaultActorSystem2 = AkkaUtils.createDefaultActorSystem();
        ActorSystem createDefaultActorSystem3 = AkkaUtils.createDefaultActorSystem();
        AkkaRpcServiceConfiguration defaultConfiguration = AkkaRpcServiceConfiguration.defaultConfiguration();
        akkaRpcService1 = new AkkaRpcService(createDefaultActorSystem, defaultConfiguration);
        akkaRpcService2 = new AkkaRpcService(createDefaultActorSystem2, defaultConfiguration);
        wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(createDefaultActorSystem3, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(akkaRpcService1.stopService());
        arrayList.add(akkaRpcService2.stopService());
        arrayList.add(wrongVersionAkkaRpcService.stopService());
        FutureUtils.waitForAll(arrayList).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testVersionMatchBetweenRpcComponents() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
        dummyRpcEndpoint.setFoobar(42);
        dummyRpcEndpoint.start();
        try {
            Assert.assertThat(((AkkaRpcActorTest.DummyRpcGateway) akkaRpcService2.connect(dummyRpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get()).foobar().get(), Matchers.equalTo(42));
            RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, timeout);
            throw th;
        }
    }

    @Test
    public void testVersionMismatchBetweenRpcComponents() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
        dummyRpcEndpoint.start();
        try {
            try {
                wrongVersionAkkaRpcService.connect(dummyRpcEndpoint.getAddress(), AkkaRpcActorTest.DummyRpcGateway.class).get();
                Assert.fail("Expected HandshakeException.");
            } catch (ExecutionException e) {
                Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(HandshakeException.class));
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, timeout);
        }
    }

    @Test
    public void testWrongGatewayEndpointConnection() throws Exception {
        AkkaRpcActorTest.DummyRpcEndpoint dummyRpcEndpoint = new AkkaRpcActorTest.DummyRpcEndpoint(akkaRpcService1);
        dummyRpcEndpoint.start();
        try {
            try {
                akkaRpcService2.connect(dummyRpcEndpoint.getAddress(), WrongRpcGateway.class).get(timeout.getSize(), timeout.getUnit());
                Assert.fail("We expected a HandshakeException.");
                RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, timeout);
            } catch (ExecutionException e) {
                Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(HandshakeException.class));
                RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, timeout);
            }
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, timeout);
            throw th;
        }
    }
}
