package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest.class */
public class RpcEndpointTest extends TestLogger {
    private static final Time TIMEOUT = Time.seconds(10);
    private static ActorSystem actorSystem = null;
    private static RpcService rpcService = null;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$BaseEndpoint.class */
    public static class BaseEndpoint extends RpcEndpoint implements BaseGateway {
        private final int foobarValue;

        protected BaseEndpoint(RpcService rpcService, int i) {
            super(rpcService);
            this.foobarValue = i;
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.BaseGateway
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(Integer.valueOf(this.foobarValue));
        }

        public CompletableFuture<Void> postStop() {
            return CompletableFuture.completedFuture(null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$BaseGateway.class */
    public interface BaseGateway extends RpcGateway {
        CompletableFuture<Integer> foobar();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$DifferentGateway.class */
    public interface DifferentGateway extends RpcGateway {
        CompletableFuture<String> foo();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$ExtendedEndpoint.class */
    public static class ExtendedEndpoint extends BaseEndpoint implements ExtendedGateway, DifferentGateway {
        private final int barfooValue;
        private final String fooString;

        protected ExtendedEndpoint(RpcService rpcService, int i, int i2, String str) {
            super(rpcService, i);
            this.barfooValue = i2;
            this.fooString = str;
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.ExtendedGateway
        public CompletableFuture<Integer> barfoo() {
            return CompletableFuture.completedFuture(Integer.valueOf(this.barfooValue));
        }

        @Override // org.apache.flink.runtime.rpc.RpcEndpointTest.DifferentGateway
        public CompletableFuture<String> foo() {
            return CompletableFuture.completedFuture(this.fooString);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/RpcEndpointTest$ExtendedGateway.class */
    public interface ExtendedGateway extends BaseGateway {
        CompletableFuture<Integer> barfoo();
    }

    @BeforeClass
    public static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        rpcService = new AkkaRpcService(actorSystem, TIMEOUT);
    }

    @AfterClass
    public static void teardown() throws Exception {
        FutureUtils.waitForAll(Arrays.asList(rpcService.stopService(), FutureUtils.toJava(actorSystem.terminate()))).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testSelfGateway() throws Exception {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 1337);
        try {
            baseEndpoint.start();
            Assert.assertEquals(1337, ((BaseGateway) baseEndpoint.getSelfGateway(BaseGateway.class)).foobar().get());
            baseEndpoint.shutDown();
        } catch (Throwable th) {
            baseEndpoint.shutDown();
            throw th;
        }
    }

    @Test(expected = RuntimeException.class)
    public void testWrongSelfGateway() throws Exception {
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 1337);
        try {
            baseEndpoint.start();
            Assert.fail("Expected to fail with a RuntimeException since we requested the wrong gateway type.");
            baseEndpoint.shutDown();
        } catch (Throwable th) {
            baseEndpoint.shutDown();
            throw th;
        }
    }

    @Test
    public void testEndpointInheritance() throws Exception {
        ExtendedEndpoint extendedEndpoint = new ExtendedEndpoint(rpcService, 1, 2, "foobar");
        try {
            extendedEndpoint.start();
            BaseGateway baseGateway = (BaseGateway) extendedEndpoint.getSelfGateway(BaseGateway.class);
            ExtendedGateway extendedGateway = (ExtendedGateway) extendedEndpoint.getSelfGateway(ExtendedGateway.class);
            DifferentGateway differentGateway = (DifferentGateway) extendedEndpoint.getSelfGateway(DifferentGateway.class);
            Assert.assertEquals(1, baseGateway.foobar().get());
            Assert.assertEquals(1, extendedGateway.foobar().get());
            Assert.assertEquals(2, extendedGateway.barfoo().get());
            Assert.assertEquals("foobar", differentGateway.foo().get());
            extendedEndpoint.shutDown();
        } catch (Throwable th) {
            extendedEndpoint.shutDown();
            throw th;
        }
    }
}
