package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.class */
public class AkkaRpcActorTest extends TestLogger {
    private static Time timeout = Time.milliseconds(10000);
    private static AkkaRpcService akkaRpcService;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$AsyncOperationGateway.class */
    interface AsyncOperationGateway extends RpcGateway {
        CompletableFuture<Integer> asyncOperation(@RpcTimeout Time time);
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$AsynchronousOnStopEndpoint.class */
    static class AsynchronousOnStopEndpoint extends RpcEndpoint {
        private final CompletableFuture<Void> onStopFuture;

        /* JADX INFO: Access modifiers changed from: protected */
        public AsynchronousOnStopEndpoint(RpcService rpcService, CompletableFuture<Void> completableFuture) {
            super(rpcService);
            this.onStopFuture = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        }

        public CompletableFuture<Void> onStop() {
            return this.onStopFuture;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DummyRpcEndpoint.class */
    static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
        private volatile int foobar;

        /* JADX INFO: Access modifiers changed from: protected */
        public DummyRpcEndpoint(RpcService rpcService) {
            super(rpcService);
            this.foobar = 42;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.DummyRpcGateway
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(Integer.valueOf(this.foobar));
        }

        public void setFoobar(int i) {
            this.foobar = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DummyRpcGateway.class */
    interface DummyRpcGateway extends RpcGateway {
        CompletableFuture<Integer> foobar();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalEndpoint.class */
    private static class ExceptionalEndpoint extends RpcEndpoint implements ExceptionalGateway {
        protected ExceptionalEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.ExceptionalGateway
        public CompletableFuture<Integer> doStuff() {
            throw new RuntimeException("my super specific test exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalFutureEndpoint.class */
    private static class ExceptionalFutureEndpoint extends RpcEndpoint implements ExceptionalGateway {
        protected ExceptionalFutureEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest$ExceptionalFutureEndpoint$1] */
        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.ExceptionalGateway
        public CompletableFuture<Integer> doStuff() {
            final CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
            new Thread() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.ExceptionalFutureEndpoint.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                    }
                    completableFuture.completeExceptionally(new Exception("some test"));
                }
            }.start();
            return completableFuture;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalGateway.class */
    private interface ExceptionalGateway extends RpcGateway {
        CompletableFuture<Integer> doStuff();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$FailingOnStopEndpoint.class */
    private static class FailingOnStopEndpoint extends RpcEndpoint implements RpcGateway {

        /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$FailingOnStopEndpoint$OnStopException.class */
        private static class OnStopException extends FlinkException {
            private static final long serialVersionUID = 6701096588415871592L;

            public OnStopException(String str) {
                super(str);
            }
        }

        protected FailingOnStopEndpoint(RpcService rpcService, String str) {
            super(rpcService, str);
        }

        public CompletableFuture<Void> onStop() {
            return FutureUtils.completedExceptionally(new OnStopException("Test exception."));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$MainThreadExecutorOnStopEndpoint.class */
    private static class MainThreadExecutorOnStopEndpoint extends RpcEndpoint {
        protected MainThreadExecutorOnStopEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        public CompletableFuture<Void> onStop() {
            return CompletableFuture.runAsync(() -> {
            }, getMainThreadExecutor());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$OnStartEndpoint.class */
    private static final class OnStartEndpoint extends RpcEndpoint {
        private final CountDownLatch countDownLatch;

        @Nullable
        private final Exception exception;

        OnStartEndpoint(RpcService rpcService, @Nullable Exception exc) {
            super(rpcService);
            this.countDownLatch = new CountDownLatch(1);
            this.exception = exc;
            getTerminationFuture().whenComplete((r3, th) -> {
                closeAsync();
            });
        }

        public void onStart() throws Exception {
            this.countDownLatch.countDown();
            ExceptionUtils.tryRethrowException(this.exception);
        }

        public void awaitUntilOnStartCalled() throws InterruptedException {
            this.countDownLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$SimpleRpcEndpoint.class */
    private static class SimpleRpcEndpoint extends RpcEndpoint implements RpcGateway {
        protected SimpleRpcEndpoint(RpcService rpcService, String str) {
            super(rpcService, str);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$TerminatingAfterOnStopFutureCompletionEndpoint.class */
    private static class TerminatingAfterOnStopFutureCompletionEndpoint extends RpcEndpoint implements AsyncOperationGateway {
        private final CompletableFuture<Void> onStopFuture;
        private final OneShotLatch blockAsyncOperation;
        private final OneShotLatch enterAsyncOperation;
        private final AtomicInteger asyncOperationCounter;

        protected TerminatingAfterOnStopFutureCompletionEndpoint(RpcService rpcService, CompletableFuture<Void> completableFuture) {
            super(rpcService);
            this.blockAsyncOperation = new OneShotLatch();
            this.enterAsyncOperation = new OneShotLatch();
            this.asyncOperationCounter = new AtomicInteger(0);
            this.onStopFuture = completableFuture;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.AsyncOperationGateway
        public CompletableFuture<Integer> asyncOperation(Time time) {
            this.asyncOperationCounter.incrementAndGet();
            this.enterAsyncOperation.trigger();
            try {
                this.blockAsyncOperation.await();
                return CompletableFuture.completedFuture(42);
            } catch (InterruptedException e) {
                throw new FlinkRuntimeException(e);
            }
        }

        public CompletableFuture<Void> onStop() {
            return this.onStopFuture;
        }

        void awaitEnterAsyncOperation() throws InterruptedException {
            this.enterAsyncOperation.await();
        }

        void triggerUnblockAsyncOperation() {
            this.blockAsyncOperation.trigger();
        }

        int getNumberAsyncOperationCalls() {
            return this.asyncOperationCounter.get();
        }
    }

    @BeforeClass
    public static void setup() {
        akkaRpcService = new TestingRpcService();
    }

    @AfterClass
    public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        RpcUtils.terminateRpcService(akkaRpcService, timeout);
    }

    @Test
    public void testAddressResolution() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        Assert.assertEquals(dummyRpcEndpoint.getAddress(), ((DummyRpcGateway) akkaRpcService.connect(dummyRpcEndpoint.getAddress(), DummyRpcGateway.class).get(timeout.getSize(), timeout.getUnit())).getAddress());
    }

    @Test
    public void testFailingAddressResolution() throws Exception {
        try {
            akkaRpcService.connect("foobar", DummyRpcGateway.class).get(timeout.getSize(), timeout.getUnit());
            Assert.fail("The rpc connection resolution should have failed.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof RpcConnectionException);
        }
    }

    @Test
    public void testMessageDiscarding() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        DummyRpcGateway dummyRpcGateway = (DummyRpcGateway) dummyRpcEndpoint.getSelfGateway(DummyRpcGateway.class);
        try {
            dummyRpcGateway.foobar().get(timeout.getSize(), timeout.getUnit());
            Assert.fail("Expected an AkkaRpcException.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof AkkaRpcException);
        }
        dummyRpcEndpoint.setFoobar(1337);
        dummyRpcEndpoint.start();
        try {
            Assert.assertThat("The new foobar value should have been returned.", dummyRpcGateway.foobar().get(timeout.getSize(), timeout.getUnit()), Is.is(1337));
            RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(dummyRpcEndpoint, timeout);
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testRpcEndpointTerminationFuture() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        dummyRpcEndpoint.start();
        CompletableFuture terminationFuture = dummyRpcEndpoint.getTerminationFuture();
        Assert.assertFalse(terminationFuture.isDone());
        dummyRpcEndpoint.getClass();
        CompletableFuture.runAsync(dummyRpcEndpoint::closeAsync, akkaRpcService.getExecutor());
        terminationFuture.get();
    }

    @Test
    public void testExceptionPropagation() throws Exception {
        ExceptionalEndpoint exceptionalEndpoint = new ExceptionalEndpoint(akkaRpcService);
        exceptionalEndpoint.start();
        try {
            ((ExceptionalGateway) exceptionalEndpoint.getSelfGateway(ExceptionalGateway.class)).doStuff().get(timeout.getSize(), timeout.getUnit());
            Assert.fail("this should fail with an exception");
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(RuntimeException.class, cause.getClass());
            Assert.assertEquals("my super specific test exception", cause.getMessage());
        }
    }

    @Test
    public void testExceptionPropagationFuturePiping() throws Exception {
        ExceptionalFutureEndpoint exceptionalFutureEndpoint = new ExceptionalFutureEndpoint(akkaRpcService);
        exceptionalFutureEndpoint.start();
        try {
            ((ExceptionalGateway) exceptionalFutureEndpoint.getSelfGateway(ExceptionalGateway.class)).doStuff().get(timeout.getSize(), timeout.getUnit());
            Assert.fail("this should fail with an exception");
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(Exception.class, cause.getClass());
            Assert.assertEquals("some test", cause.getMessage());
        }
    }

    @Test
    public void testOnStopExceptionPropagation() throws Exception {
        FailingOnStopEndpoint failingOnStopEndpoint = new FailingOnStopEndpoint(akkaRpcService, "FailingOnStopEndpoint");
        failingOnStopEndpoint.start();
        try {
            failingOnStopEndpoint.closeAsync().get();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof FailingOnStopEndpoint.OnStopException);
        }
    }

    @Test
    public void testOnStopExecutedByMainThread() throws Exception {
        SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService, "SimpleRpcEndpoint");
        simpleRpcEndpoint.start();
        simpleRpcEndpoint.closeAsync().get();
    }

    @Test
    public void testActorTerminationWhenServiceShutdown() throws Exception {
        ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
        AkkaRpcService akkaRpcService2 = new AkkaRpcService(createDefaultActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
        try {
            SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService2, SimpleRpcEndpoint.class.getSimpleName());
            simpleRpcEndpoint.start();
            CompletableFuture terminationFuture = simpleRpcEndpoint.getTerminationFuture();
            akkaRpcService2.stopService();
            terminationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            createDefaultActorSystem.terminate();
            Await.ready(createDefaultActorSystem.whenTerminated(), FutureUtils.toFiniteDuration(timeout));
        } catch (Throwable th) {
            createDefaultActorSystem.terminate();
            Await.ready(createDefaultActorSystem.whenTerminated(), FutureUtils.toFiniteDuration(timeout));
            throw th;
        }
    }

    @Test
    public void testActorTerminationWithAsynchronousOnStopAction() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        AsynchronousOnStopEndpoint asynchronousOnStopEndpoint = new AsynchronousOnStopEndpoint(akkaRpcService, completableFuture);
        try {
            asynchronousOnStopEndpoint.start();
            CompletableFuture closeAsync = asynchronousOnStopEndpoint.closeAsync();
            Assert.assertFalse(closeAsync.isDone());
            completableFuture.complete(null);
            closeAsync.get();
            RpcUtils.terminateRpcEndpoint(asynchronousOnStopEndpoint, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(asynchronousOnStopEndpoint, timeout);
            throw th;
        }
    }

    @Test
    public void testMainThreadExecutionOnStop() throws Exception {
        MainThreadExecutorOnStopEndpoint mainThreadExecutorOnStopEndpoint = new MainThreadExecutorOnStopEndpoint(akkaRpcService);
        try {
            mainThreadExecutorOnStopEndpoint.start();
            mainThreadExecutorOnStopEndpoint.closeAsync().get();
        } finally {
            RpcUtils.terminateRpcEndpoint(mainThreadExecutorOnStopEndpoint, timeout);
        }
    }

    @Test
    public void testOnStopFutureCompletionDirectlyTerminatesAkkaRpcActor() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TerminatingAfterOnStopFutureCompletionEndpoint terminatingAfterOnStopFutureCompletionEndpoint = new TerminatingAfterOnStopFutureCompletionEndpoint(akkaRpcService, completableFuture);
        try {
            terminatingAfterOnStopFutureCompletionEndpoint.start();
            AsyncOperationGateway asyncOperationGateway = (AsyncOperationGateway) terminatingAfterOnStopFutureCompletionEndpoint.getSelfGateway(AsyncOperationGateway.class);
            CompletableFuture closeAsync = terminatingAfterOnStopFutureCompletionEndpoint.closeAsync();
            Assert.assertThat(Boolean.valueOf(closeAsync.isDone()), Matchers.is(false));
            CompletableFuture<Integer> asyncOperation = asyncOperationGateway.asyncOperation(timeout);
            CompletableFuture<Integer> asyncOperation2 = asyncOperationGateway.asyncOperation(timeout);
            terminatingAfterOnStopFutureCompletionEndpoint.awaitEnterAsyncOperation();
            completableFuture.complete(null);
            Assert.assertThat(Boolean.valueOf(closeAsync.isDone()), Matchers.is(false));
            terminatingAfterOnStopFutureCompletionEndpoint.triggerUnblockAsyncOperation();
            Assert.assertThat(asyncOperation.get(), Matchers.is(42));
            closeAsync.get();
            Assert.assertThat(Integer.valueOf(terminatingAfterOnStopFutureCompletionEndpoint.getNumberAsyncOperationCalls()), Matchers.is(1));
            Assert.assertThat(Boolean.valueOf(asyncOperation2.isDone()), Matchers.is(false));
            RpcUtils.terminateRpcEndpoint(terminatingAfterOnStopFutureCompletionEndpoint, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(terminatingAfterOnStopFutureCompletionEndpoint, timeout);
            throw th;
        }
    }

    @Test
    public void testOnStartIsCalledWhenRpcEndpointStarts() throws Exception {
        OnStartEndpoint onStartEndpoint = new OnStartEndpoint(akkaRpcService, null);
        try {
            onStartEndpoint.start();
            onStartEndpoint.awaitUntilOnStartCalled();
        } finally {
            RpcUtils.terminateRpcEndpoint(onStartEndpoint, timeout);
        }
    }

    @Test
    public void testOnStartFails() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        OnStartEndpoint onStartEndpoint = new OnStartEndpoint(akkaRpcService, flinkException);
        onStartEndpoint.start();
        onStartEndpoint.awaitUntilOnStartCalled();
        try {
            onStartEndpoint.getTerminationFuture().get();
            Assert.fail("Expected that the rpc endpoint failed onStart and thus has terminated.");
        } catch (ExecutionException e) {
            Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, th -> {
                return th.equals(flinkException);
            }).isPresent()), Matchers.is(true));
        }
    }
}
