package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.class */
public class AkkaRpcActorTest extends TestLogger {
    private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static Time timeout = Time.milliseconds(10000);
    private static AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, timeout);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DummyRpcEndpoint.class */
    public static class DummyRpcEndpoint extends RpcEndpoint<DummyRpcGateway> {
        private volatile int _foobar;

        protected DummyRpcEndpoint(RpcService rpcService) {
            super(rpcService);
            this._foobar = 42;
        }

        @RpcMethod
        public int foobar() {
            return this._foobar;
        }

        public void setFoobar(int i) {
            this._foobar = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DummyRpcGateway.class */
    private interface DummyRpcGateway extends RpcGateway {
        Future<Integer> foobar();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalEndpoint.class */
    private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
        protected ExceptionalEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @RpcMethod
        public int doStuff() {
            throw new RuntimeException("my super specific test exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalFutureEndpoint.class */
    private static class ExceptionalFutureEndpoint extends RpcEndpoint<ExceptionalGateway> {
        protected ExceptionalFutureEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest$ExceptionalFutureEndpoint$1] */
        @RpcMethod
        public Future<Integer> doStuff() {
            final FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
            new Thread() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.ExceptionalFutureEndpoint.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                    }
                    flinkCompletableFuture.completeExceptionally(new Exception("some test"));
                }
            }.start();
            return flinkCompletableFuture;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalGateway.class */
    private interface ExceptionalGateway extends RpcGateway {
        Future<Integer> doStuff();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$WrongRpcGateway.class */
    private interface WrongRpcGateway extends RpcGateway {
        Future<Boolean> barfoo();

        void tell(String str);
    }

    @AfterClass
    public static void shutdown() {
        akkaRpcService.stopService();
        actorSystem.shutdown();
        actorSystem.awaitTermination();
    }

    @Test
    public void testAddressResolution() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        Assert.assertEquals(dummyRpcEndpoint.getAddress(), ((DummyRpcGateway) akkaRpcService.connect(dummyRpcEndpoint.getAddress(), DummyRpcGateway.class).get(timeout.getSize(), timeout.getUnit())).getAddress());
    }

    @Test
    public void testFailingAddressResolution() throws Exception {
        try {
            akkaRpcService.connect("foobar", DummyRpcGateway.class).get(timeout.getSize(), timeout.getUnit());
            Assert.fail("The rpc connection resolution should have failed.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof RpcConnectionException);
        }
    }

    @Test
    public void testMessageDiscarding() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        DummyRpcGateway dummyRpcGateway = (DummyRpcGateway) dummyRpcEndpoint.getSelf();
        try {
            dummyRpcGateway.foobar().get(timeout.getSize(), timeout.getUnit());
            Assert.fail("Expected an AkkaRpcException.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof AkkaRpcException);
        }
        dummyRpcEndpoint.setFoobar(1337);
        dummyRpcEndpoint.start();
        Assert.assertThat("The new foobar value should have been returned.", (Integer) dummyRpcGateway.foobar().get(timeout.getSize(), timeout.getUnit()), Is.is(1337));
        dummyRpcEndpoint.shutDown();
    }

    @Test
    public void testWrongGatewayEndpointConnection() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        dummyRpcEndpoint.start();
        WrongRpcGateway wrongRpcGateway = (WrongRpcGateway) akkaRpcService.connect(dummyRpcEndpoint.getAddress(), WrongRpcGateway.class).get(timeout.getSize(), timeout.getUnit());
        wrongRpcGateway.tell("foobar");
        try {
            wrongRpcGateway.barfoo().get(timeout.getSize(), timeout.getUnit());
            Assert.fail("We expected a RpcConnectionException.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof RpcConnectionException);
        }
    }

    @Test(timeout = 5000)
    public void testRpcEndpointTerminationFuture() throws Exception {
        final DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        dummyRpcEndpoint.start();
        Future terminationFuture = dummyRpcEndpoint.getTerminationFuture();
        Assert.assertFalse(terminationFuture.isDone());
        FlinkFuture.supplyAsync(new Callable<Void>() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                dummyRpcEndpoint.shutDown();
                return null;
            }
        }, actorSystem.dispatcher());
        terminationFuture.get();
    }

    @Test
    public void testExceptionPropagation() throws Exception {
        ExceptionalEndpoint exceptionalEndpoint = new ExceptionalEndpoint(akkaRpcService);
        exceptionalEndpoint.start();
        try {
            ((ExceptionalGateway) exceptionalEndpoint.getSelf()).doStuff().get(timeout.getSize(), timeout.getUnit());
            Assert.fail("this should fail with an exception");
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(RuntimeException.class, cause.getClass());
            Assert.assertEquals("my super specific test exception", cause.getMessage());
        }
    }

    @Test
    public void testExceptionPropagationFuturePiping() throws Exception {
        ExceptionalFutureEndpoint exceptionalFutureEndpoint = new ExceptionalFutureEndpoint(akkaRpcService);
        exceptionalFutureEndpoint.start();
        try {
            ((ExceptionalGateway) exceptionalFutureEndpoint.getSelf()).doStuff().get(timeout.getSize(), timeout.getUnit());
            Assert.fail("this should fail with an exception");
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(Exception.class, cause.getClass());
            Assert.assertEquals("some test", cause.getMessage());
        }
    }
}
