package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.class */
class AkkaRpcActorTest {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActorTest.class);
    private static Duration timeout = Duration.ofSeconds(10);
    private static AkkaRpcService akkaRpcService;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$AsyncOperationGateway.class */
    interface AsyncOperationGateway extends RpcGateway {
        CompletableFuture<Integer> asyncOperation(@RpcTimeout Time time);
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$AsynchronousOnStopEndpoint.class */
    static class AsynchronousOnStopEndpoint extends RpcEndpoint {
        private final CompletableFuture<Void> onStopFuture;

        /* JADX INFO: Access modifiers changed from: protected */
        public AsynchronousOnStopEndpoint(RpcService rpcService, CompletableFuture<Void> completableFuture) {
            super(rpcService);
            this.onStopFuture = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        }

        public CompletableFuture<Void> onStop() {
            return this.onStopFuture;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DeserializationFailingObject.class */
    private static class DeserializationFailingObject implements Serializable {
        private DeserializationFailingObject() {
        }

        private void readObject(ObjectInputStream objectInputStream) throws ClassNotFoundException, IOException {
            throw new ClassNotFoundException("test exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DeserializatonFailingEndpoint.class */
    private static class DeserializatonFailingEndpoint extends RpcEndpoint implements DeserializatonFailingGateway {
        protected DeserializatonFailingEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.DeserializatonFailingGateway
        public CompletableFuture<DeserializationFailingObject> doStuff() {
            return CompletableFuture.completedFuture(new DeserializationFailingObject());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DeserializatonFailingGateway.class */
    private interface DeserializatonFailingGateway extends RpcGateway {
        CompletableFuture<DeserializationFailingObject> doStuff();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DummyRpcEndpoint.class */
    static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
        private volatile int foobar;

        /* JADX INFO: Access modifiers changed from: protected */
        public DummyRpcEndpoint(RpcService rpcService) {
            super(rpcService);
            this.foobar = 42;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.DummyRpcGateway
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(Integer.valueOf(this.foobar));
        }

        public void setFoobar(int i) {
            this.foobar = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$DummyRpcGateway.class */
    public interface DummyRpcGateway extends RpcGateway {
        CompletableFuture<Integer> foobar();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalEndpoint.class */
    private static class ExceptionalEndpoint extends RpcEndpoint implements ExceptionalGateway {
        protected ExceptionalEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.ExceptionalGateway
        public CompletableFuture<Integer> doStuff() {
            throw new RuntimeException("my super specific test exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalFutureEndpoint.class */
    private static class ExceptionalFutureEndpoint extends RpcEndpoint implements ExceptionalGateway {
        protected ExceptionalFutureEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest$ExceptionalFutureEndpoint$1] */
        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.ExceptionalGateway
        public CompletableFuture<Integer> doStuff() {
            final CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
            new Thread() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.ExceptionalFutureEndpoint.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                    }
                    completableFuture.completeExceptionally(new Exception("some test"));
                }
            }.start();
            return completableFuture;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$ExceptionalGateway.class */
    private interface ExceptionalGateway extends RpcGateway {
        CompletableFuture<Integer> doStuff();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$FailingOnStopEndpoint.class */
    private static class FailingOnStopEndpoint extends RpcEndpoint implements RpcGateway {

        /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$FailingOnStopEndpoint$OnStopException.class */
        private static class OnStopException extends FlinkException {
            private static final long serialVersionUID = 6701096588415871592L;

            public OnStopException(String str) {
                super(str);
            }
        }

        protected FailingOnStopEndpoint(RpcService rpcService, String str) {
            super(rpcService, str);
        }

        public CompletableFuture<Void> onStop() {
            return FutureUtils.completedExceptionally(new OnStopException("Test exception."));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$MainThreadExecutorOnStopEndpoint.class */
    private static class MainThreadExecutorOnStopEndpoint extends RpcEndpoint {
        protected MainThreadExecutorOnStopEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        public CompletableFuture<Void> onStop() {
            return CompletableFuture.runAsync(() -> {
            }, getMainThreadExecutor());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$NullRespondingEndpoint.class */
    static class NullRespondingEndpoint extends RpcEndpoint implements NullRespondingGateway {
        /* JADX INFO: Access modifiers changed from: protected */
        public NullRespondingEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.DummyRpcGateway
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(null);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.NullRespondingGateway
        public Integer synchronousFoobar() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$NullRespondingGateway.class */
    interface NullRespondingGateway extends DummyRpcGateway {
        Integer synchronousFoobar();
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$OnStartEndpoint.class */
    private static final class OnStartEndpoint extends RpcEndpoint {
        private final CountDownLatch countDownLatch;

        @Nullable
        private final Exception exception;

        OnStartEndpoint(RpcService rpcService, @Nullable Exception exc) {
            super(rpcService);
            this.countDownLatch = new CountDownLatch(1);
            this.exception = exc;
            getTerminationFuture().whenComplete((r3, th) -> {
                closeAsync();
            });
        }

        public void onStart() throws Exception {
            this.countDownLatch.countDown();
            ExceptionUtils.tryRethrowException(this.exception);
        }

        public void awaitUntilOnStartCalled() throws InterruptedException {
            this.countDownLatch.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$OnStopCountingRpcEndpoint.class */
    private static final class OnStopCountingRpcEndpoint extends RpcEndpoint {
        private final AtomicInteger numOnStopCalls;
        private final OneShotLatch onStopHasBeenCalled;
        private final CompletableFuture<Void> onStopFuture;

        private OnStopCountingRpcEndpoint(RpcService rpcService, CompletableFuture<Void> completableFuture) {
            super(rpcService);
            this.numOnStopCalls = new AtomicInteger(0);
            this.onStopHasBeenCalled = new OneShotLatch();
            this.onStopFuture = completableFuture;
        }

        protected CompletableFuture<Void> onStop() {
            this.onStopHasBeenCalled.trigger();
            this.numOnStopCalls.incrementAndGet();
            return this.onStopFuture;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumOnStopCalls() {
            return this.numOnStopCalls.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitUntilOnStopHasBeenCalled() throws InterruptedException {
            this.onStopHasBeenCalled.await();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$SchedulingRpcEndpoint.class */
    private static final class SchedulingRpcEndpoint extends RpcEndpoint implements SchedulingRpcEndpointGateway {
        static final int DELAY_MILLIS = 20;

        public SchedulingRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.SchedulingRpcEndpointGateway
        public void schedule(CompletableFuture<Void> completableFuture, CompletableFuture<Void> completableFuture2, CompletableFuture<Void> completableFuture3) {
            getMainThreadExecutor().schedule(() -> {
                return Boolean.valueOf(completableFuture.complete(null));
            }, 20L, TimeUnit.MILLISECONDS);
            getMainThreadExecutor().schedule(() -> {
                completableFuture2.complete(null);
                return null;
            }, 20L, TimeUnit.MILLISECONDS);
            getMainThreadExecutor().execute(() -> {
                completableFuture3.complete(null);
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$SchedulingRpcEndpointGateway.class */
    interface SchedulingRpcEndpointGateway extends RpcGateway {
        @Local
        void schedule(CompletableFuture<Void> completableFuture, CompletableFuture<Void> completableFuture2, CompletableFuture<Void> completableFuture3);
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$SerializedValueRespondingEndpoint.class */
    static class SerializedValueRespondingEndpoint extends RpcEndpoint implements SerializedValueRespondingGateway {
        static final SerializedValue<String> SERIALIZED_VALUE;

        public SerializedValueRespondingEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.SerializedValueRespondingGateway
        public CompletableFuture<SerializedValue<String>> getSerializedValue() {
            return CompletableFuture.completedFuture(SERIALIZED_VALUE);
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.SerializedValueRespondingGateway
        public SerializedValue<String> getSerializedValueSynchronously() {
            return SERIALIZED_VALUE;
        }

        static {
            try {
                SERIALIZED_VALUE = new SerializedValue<>("string-value");
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$SerializedValueRespondingGateway.class */
    public interface SerializedValueRespondingGateway extends RpcGateway {
        CompletableFuture<SerializedValue<String>> getSerializedValue();

        SerializedValue<String> getSerializedValueSynchronously();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$SimpleRpcEndpoint.class */
    public static class SimpleRpcEndpoint extends RpcEndpoint implements RpcGateway {
        protected SimpleRpcEndpoint(RpcService rpcService, String str) {
            super(rpcService, str);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest$TerminatingAfterOnStopFutureCompletionEndpoint.class */
    private static class TerminatingAfterOnStopFutureCompletionEndpoint extends RpcEndpoint implements AsyncOperationGateway {
        private final CompletableFuture<Void> onStopFuture;
        private final OneShotLatch blockAsyncOperation;
        private final OneShotLatch enterAsyncOperation;
        private final AtomicInteger asyncOperationCounter;

        protected TerminatingAfterOnStopFutureCompletionEndpoint(RpcService rpcService, CompletableFuture<Void> completableFuture) {
            super(rpcService);
            this.blockAsyncOperation = new OneShotLatch();
            this.enterAsyncOperation = new OneShotLatch();
            this.asyncOperationCounter = new AtomicInteger(0);
            this.onStopFuture = completableFuture;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActorTest.AsyncOperationGateway
        public CompletableFuture<Integer> asyncOperation(Time time) {
            this.asyncOperationCounter.incrementAndGet();
            this.enterAsyncOperation.trigger();
            try {
                this.blockAsyncOperation.await();
                return CompletableFuture.completedFuture(42);
            } catch (InterruptedException e) {
                throw new FlinkRuntimeException(e);
            }
        }

        public CompletableFuture<Void> onStop() {
            return this.onStopFuture;
        }

        void awaitEnterAsyncOperation() throws InterruptedException {
            this.enterAsyncOperation.await();
        }

        void triggerUnblockAsyncOperation() {
            this.blockAsyncOperation.trigger();
        }

        int getNumberAsyncOperationCalls() {
            return this.asyncOperationCounter.get();
        }
    }

    AkkaRpcActorTest() {
    }

    @BeforeAll
    static void setup() {
        akkaRpcService = new AkkaRpcService(AkkaUtils.createLocalActorSystem(new Configuration()), AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterAll
    static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        RpcUtils.terminateRpcService(new RpcService[]{akkaRpcService});
    }

    @Test
    void testAddressResolution() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        Assertions.assertThat(((DummyRpcGateway) akkaRpcService.connect(dummyRpcEndpoint.getAddress(), DummyRpcGateway.class).get()).getAddress()).isEqualTo(dummyRpcEndpoint.getAddress());
    }

    @Test
    void testFailingAddressResolution() throws Exception {
        CompletableFuture connect = akkaRpcService.connect("foobar", DummyRpcGateway.class);
        Assertions.assertThatThrownBy(() -> {
        }).hasCauseInstanceOf(RpcConnectionException.class);
    }

    @Test
    void testMessageDiscarding() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        DummyRpcGateway dummyRpcGateway = (DummyRpcGateway) dummyRpcEndpoint.getSelfGateway(DummyRpcGateway.class);
        Assertions.assertThatThrownBy(() -> {
            dummyRpcGateway.foobar().get();
        }).hasCauseInstanceOf(EndpointNotStartedException.class);
        dummyRpcEndpoint.setFoobar(1337);
        dummyRpcEndpoint.start();
        try {
            Assertions.assertThat(dummyRpcGateway.foobar().get()).isEqualTo(1337);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{dummyRpcEndpoint});
            throw th;
        }
    }

    @Test
    void testRpcEndpointTerminationFuture() throws Exception {
        DummyRpcEndpoint dummyRpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
        dummyRpcEndpoint.start();
        CompletableFuture terminationFuture = dummyRpcEndpoint.getTerminationFuture();
        Assertions.assertThat(terminationFuture).isNotDone();
        dummyRpcEndpoint.getClass();
        CompletableFuture.runAsync(dummyRpcEndpoint::closeAsync, akkaRpcService.getScheduledExecutor());
        terminationFuture.get();
    }

    @Test
    void testExceptionPropagation() throws Exception {
        ExceptionalEndpoint exceptionalEndpoint = new ExceptionalEndpoint(akkaRpcService);
        exceptionalEndpoint.start();
        CompletableFuture<Integer> doStuff = ((ExceptionalGateway) exceptionalEndpoint.getSelfGateway(ExceptionalGateway.class)).doStuff();
        Assertions.assertThatThrownBy(() -> {
        }).extracting(th -> {
            return th.getCause();
        }).satisfies(new ThrowingConsumer[]{th2 -> {
            Assertions.assertThat(th2).isInstanceOf(RuntimeException.class).hasMessage("my super specific test exception");
        }});
    }

    @Test
    void testExceptionPropagationFuturePiping() throws Exception {
        ExceptionalFutureEndpoint exceptionalFutureEndpoint = new ExceptionalFutureEndpoint(akkaRpcService);
        exceptionalFutureEndpoint.start();
        CompletableFuture<Integer> doStuff = ((ExceptionalGateway) exceptionalFutureEndpoint.getSelfGateway(ExceptionalGateway.class)).doStuff();
        Assertions.assertThatThrownBy(() -> {
        }).extracting(th -> {
            return th.getCause();
        }).satisfies(new ThrowingConsumer[]{th2 -> {
            Assertions.assertThat(th2).isInstanceOf(Exception.class).hasMessage("some test");
        }});
    }

    @Test
    void testResultFutureFailsOnDeserializationError() throws Exception {
        RpcService akkaRpcService2 = new AkkaRpcService(AkkaUtils.createActorSystem("serverActorSystem", AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("localhost", 0))), AkkaRpcServiceConfiguration.defaultConfiguration());
        RpcService akkaRpcService3 = new AkkaRpcService(AkkaUtils.createActorSystem("clientActorSystem", AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("localhost", 0))), AkkaRpcServiceConfiguration.defaultConfiguration());
        try {
            DeserializatonFailingEndpoint deserializatonFailingEndpoint = new DeserializatonFailingEndpoint(akkaRpcService2);
            deserializatonFailingEndpoint.start();
            Assertions.assertThat(((DeserializatonFailingGateway) akkaRpcService3.connect(((DeserializatonFailingGateway) deserializatonFailingEndpoint.getSelfGateway(DeserializatonFailingGateway.class)).getAddress(), DeserializatonFailingGateway.class).get()).doStuff()).failsWithin(Duration.ofHours(1L)).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RpcException.class);
            RpcUtils.terminateRpcService(new RpcService[]{akkaRpcService3});
            RpcUtils.terminateRpcService(new RpcService[]{akkaRpcService2});
        } catch (Throwable th) {
            RpcUtils.terminateRpcService(new RpcService[]{akkaRpcService3});
            RpcUtils.terminateRpcService(new RpcService[]{akkaRpcService2});
            throw th;
        }
    }

    @Test
    void testOnStopExceptionPropagation() throws Exception {
        FailingOnStopEndpoint failingOnStopEndpoint = new FailingOnStopEndpoint(akkaRpcService, "FailingOnStopEndpoint");
        failingOnStopEndpoint.start();
        CompletableFuture closeAsync = failingOnStopEndpoint.closeAsync();
        closeAsync.getClass();
        Assertions.assertThatThrownBy(closeAsync::get).hasCauseInstanceOf(FailingOnStopEndpoint.OnStopException.class);
    }

    @Test
    void testOnStopExecutedByMainThread() throws Exception {
        SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService, "SimpleRpcEndpoint");
        simpleRpcEndpoint.start();
        simpleRpcEndpoint.closeAsync().get();
    }

    @Test
    void testActorTerminationWhenServiceShutdown() throws Exception {
        ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
        AkkaRpcService akkaRpcService2 = new AkkaRpcService(createDefaultActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
        try {
            SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService2, SimpleRpcEndpoint.class.getSimpleName());
            simpleRpcEndpoint.start();
            CompletableFuture terminationFuture = simpleRpcEndpoint.getTerminationFuture();
            akkaRpcService2.stopService();
            terminationFuture.get();
            createDefaultActorSystem.terminate();
            AkkaFutureUtils.toJava(createDefaultActorSystem.whenTerminated()).get();
        } catch (Throwable th) {
            createDefaultActorSystem.terminate();
            AkkaFutureUtils.toJava(createDefaultActorSystem.whenTerminated()).get();
            throw th;
        }
    }

    @Test
    void testActorTerminationWithAsynchronousOnStopAction() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        AsynchronousOnStopEndpoint asynchronousOnStopEndpoint = new AsynchronousOnStopEndpoint(akkaRpcService, completableFuture);
        try {
            asynchronousOnStopEndpoint.start();
            CompletableFuture closeAsync = asynchronousOnStopEndpoint.closeAsync();
            Assertions.assertThat(closeAsync).isNotDone();
            completableFuture.complete(null);
            closeAsync.get();
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{asynchronousOnStopEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{asynchronousOnStopEndpoint});
            throw th;
        }
    }

    @Test
    void testMainThreadExecutionOnStop() throws Exception {
        MainThreadExecutorOnStopEndpoint mainThreadExecutorOnStopEndpoint = new MainThreadExecutorOnStopEndpoint(akkaRpcService);
        try {
            mainThreadExecutorOnStopEndpoint.start();
            mainThreadExecutorOnStopEndpoint.closeAsync().get();
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{mainThreadExecutorOnStopEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{mainThreadExecutorOnStopEndpoint});
            throw th;
        }
    }

    @Test
    void testOnStopFutureCompletionDirectlyTerminatesAkkaRpcActor() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TerminatingAfterOnStopFutureCompletionEndpoint terminatingAfterOnStopFutureCompletionEndpoint = new TerminatingAfterOnStopFutureCompletionEndpoint(akkaRpcService, completableFuture);
        try {
            terminatingAfterOnStopFutureCompletionEndpoint.start();
            AsyncOperationGateway asyncOperationGateway = (AsyncOperationGateway) terminatingAfterOnStopFutureCompletionEndpoint.getSelfGateway(AsyncOperationGateway.class);
            CompletableFuture closeAsync = terminatingAfterOnStopFutureCompletionEndpoint.closeAsync();
            Assertions.assertThat(closeAsync).isNotDone();
            CompletableFuture<Integer> asyncOperation = asyncOperationGateway.asyncOperation(Time.fromDuration(timeout));
            CompletableFuture<Integer> asyncOperation2 = asyncOperationGateway.asyncOperation(Time.fromDuration(timeout));
            terminatingAfterOnStopFutureCompletionEndpoint.awaitEnterAsyncOperation();
            completableFuture.complete(null);
            Assertions.assertThat(closeAsync).isNotDone();
            terminatingAfterOnStopFutureCompletionEndpoint.triggerUnblockAsyncOperation();
            Assertions.assertThat(asyncOperation.get()).isEqualTo(42);
            closeAsync.get();
            Assertions.assertThat(terminatingAfterOnStopFutureCompletionEndpoint.getNumberAsyncOperationCalls()).isEqualTo(1);
            Assertions.assertThat(asyncOperation2).failsWithin(timeout).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RecipientUnreachableException.class);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{terminatingAfterOnStopFutureCompletionEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{terminatingAfterOnStopFutureCompletionEndpoint});
            throw th;
        }
    }

    @Test
    void testOnStartIsCalledWhenRpcEndpointStarts() throws Exception {
        OnStartEndpoint onStartEndpoint = new OnStartEndpoint(akkaRpcService, null);
        try {
            onStartEndpoint.start();
            onStartEndpoint.awaitUntilOnStartCalled();
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{onStartEndpoint});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{onStartEndpoint});
            throw th;
        }
    }

    @Test
    void testOnStartFails() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        OnStartEndpoint onStartEndpoint = new OnStartEndpoint(akkaRpcService, flinkException);
        onStartEndpoint.start();
        onStartEndpoint.awaitUntilOnStartCalled();
        Assertions.assertThatThrownBy(() -> {
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(flinkException.getClass(), flinkException.getMessage())});
    }

    @Test
    void callsOnStopOnlyOnce() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        OnStopCountingRpcEndpoint onStopCountingRpcEndpoint = new OnStopCountingRpcEndpoint(akkaRpcService, completableFuture);
        try {
            onStopCountingRpcEndpoint.start();
            AkkaBasedEndpoint selfGateway = onStopCountingRpcEndpoint.getSelfGateway(AkkaBasedEndpoint.class);
            selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender());
            selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender());
            onStopCountingRpcEndpoint.waitUntilOnStopHasBeenCalled();
            completableFuture.complete(null);
            onStopCountingRpcEndpoint.getTerminationFuture().get();
            Assertions.assertThat(onStopCountingRpcEndpoint.getNumOnStopCalls()).isEqualTo(1);
            completableFuture.complete(null);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{onStopCountingRpcEndpoint});
        } catch (Throwable th) {
            completableFuture.complete(null);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{onStopCountingRpcEndpoint});
            throw th;
        }
    }

    @Test
    void canReuseEndpointNameAfterTermination() throws Exception {
        SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService, "not_unique");
        Throwable th = null;
        try {
            simpleRpcEndpoint.start();
            simpleRpcEndpoint.closeAsync().join();
            SimpleRpcEndpoint simpleRpcEndpoint2 = new SimpleRpcEndpoint(akkaRpcService, "not_unique");
            Throwable th2 = null;
            try {
                try {
                    simpleRpcEndpoint2.start();
                    Assertions.assertThat(simpleRpcEndpoint2.getAddress()).isEqualTo(simpleRpcEndpoint.getAddress());
                    if (simpleRpcEndpoint2 != null) {
                        if (0 != 0) {
                            try {
                                simpleRpcEndpoint2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            simpleRpcEndpoint2.close();
                        }
                    }
                    if (simpleRpcEndpoint != null) {
                        if (0 == 0) {
                            simpleRpcEndpoint.close();
                            return;
                        }
                        try {
                            simpleRpcEndpoint.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (simpleRpcEndpoint2 != null) {
                    if (th2 != null) {
                        try {
                            simpleRpcEndpoint2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        simpleRpcEndpoint2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (simpleRpcEndpoint != null) {
                if (0 != 0) {
                    try {
                        simpleRpcEndpoint.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    simpleRpcEndpoint.close();
                }
            }
            throw th8;
        }
    }

    @Test
    void terminationFutureDoesNotBlockRpcEndpointCreation() throws Exception {
        SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint(akkaRpcService, "foobar");
        Throwable th = null;
        try {
            CompletableFuture thenApply = simpleRpcEndpoint.getTerminationFuture().thenApply(r5 -> {
                return new SimpleRpcEndpoint(akkaRpcService, "foobar2");
            });
            simpleRpcEndpoint.closeAsync();
            ((SimpleRpcEndpoint) thenApply.join()).close();
            if (simpleRpcEndpoint != null) {
                if (0 == 0) {
                    simpleRpcEndpoint.close();
                    return;
                }
                try {
                    simpleRpcEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (simpleRpcEndpoint != null) {
                if (0 != 0) {
                    try {
                        simpleRpcEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    simpleRpcEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void resolvesRunningAkkaRpcActor() throws Exception {
        RpcEndpoint createRpcEndpointWithRandomNameSuffix = createRpcEndpointWithRandomNameSuffix("foobar");
        Throwable th = null;
        try {
            RpcEndpoint createRpcEndpointWithRandomNameSuffix2 = createRpcEndpointWithRandomNameSuffix("foobar");
            Throwable th2 = null;
            try {
                createRpcEndpointWithRandomNameSuffix.closeAsync().join();
                Assertions.assertThat(((RpcGateway) akkaRpcService.connect(AkkaRpcServiceUtils.getLocalRpcUrl(RpcServiceUtils.createWildcardName("foobar")), RpcGateway.class).join()).getAddress()).isEqualTo(createRpcEndpointWithRandomNameSuffix2.getAddress());
                if (createRpcEndpointWithRandomNameSuffix2 != null) {
                    if (0 != 0) {
                        try {
                            createRpcEndpointWithRandomNameSuffix2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createRpcEndpointWithRandomNameSuffix2.close();
                    }
                }
                if (createRpcEndpointWithRandomNameSuffix != null) {
                    if (0 == 0) {
                        createRpcEndpointWithRandomNameSuffix.close();
                        return;
                    }
                    try {
                        createRpcEndpointWithRandomNameSuffix.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (createRpcEndpointWithRandomNameSuffix2 != null) {
                    if (0 != 0) {
                        try {
                            createRpcEndpointWithRandomNameSuffix2.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        createRpcEndpointWithRandomNameSuffix2.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (createRpcEndpointWithRandomNameSuffix != null) {
                if (0 != 0) {
                    try {
                        createRpcEndpointWithRandomNameSuffix.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    createRpcEndpointWithRandomNameSuffix.close();
                }
            }
            throw th7;
        }
    }

    private RpcEndpoint createRpcEndpointWithRandomNameSuffix(String str) {
        return new SimpleRpcEndpoint(akkaRpcService, RpcServiceUtils.createRandomName(str));
    }

    @Test
    void canRespondWithNullValueLocally() throws Exception {
        NullRespondingEndpoint nullRespondingEndpoint = new NullRespondingEndpoint(akkaRpcService);
        Throwable th = null;
        try {
            nullRespondingEndpoint.start();
            Assertions.assertThat(((NullRespondingGateway) nullRespondingEndpoint.getSelfGateway(NullRespondingGateway.class)).foobar().join()).isNull();
            if (nullRespondingEndpoint != null) {
                if (0 == 0) {
                    nullRespondingEndpoint.close();
                    return;
                }
                try {
                    nullRespondingEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (nullRespondingEndpoint != null) {
                if (0 != 0) {
                    try {
                        nullRespondingEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    nullRespondingEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void canRespondWithSynchronousNullValueLocally() throws Exception {
        NullRespondingEndpoint nullRespondingEndpoint = new NullRespondingEndpoint(akkaRpcService);
        Throwable th = null;
        try {
            try {
                nullRespondingEndpoint.start();
                Assertions.assertThat(((NullRespondingGateway) nullRespondingEndpoint.getSelfGateway(NullRespondingGateway.class)).synchronousFoobar()).isNull();
                if (nullRespondingEndpoint != null) {
                    if (0 == 0) {
                        nullRespondingEndpoint.close();
                        return;
                    }
                    try {
                        nullRespondingEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (nullRespondingEndpoint != null) {
                if (th != null) {
                    try {
                        nullRespondingEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    nullRespondingEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void canRespondWithSerializedValueLocally() throws Exception {
        SerializedValueRespondingEndpoint serializedValueRespondingEndpoint = new SerializedValueRespondingEndpoint(akkaRpcService);
        Throwable th = null;
        try {
            serializedValueRespondingEndpoint.start();
            SerializedValueRespondingGateway serializedValueRespondingGateway = (SerializedValueRespondingGateway) serializedValueRespondingEndpoint.getSelfGateway(SerializedValueRespondingGateway.class);
            Assertions.assertThat(serializedValueRespondingGateway.getSerializedValueSynchronously()).isEqualTo(SerializedValueRespondingEndpoint.SERIALIZED_VALUE);
            Assertions.assertThat(serializedValueRespondingGateway.getSerializedValue().get()).isEqualTo(SerializedValueRespondingEndpoint.SERIALIZED_VALUE);
            if (serializedValueRespondingEndpoint != null) {
                if (0 == 0) {
                    serializedValueRespondingEndpoint.close();
                    return;
                }
                try {
                    serializedValueRespondingEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (serializedValueRespondingEndpoint != null) {
                if (0 != 0) {
                    try {
                        serializedValueRespondingEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serializedValueRespondingEndpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testScheduling() throws ExecutionException, InterruptedException {
        SchedulingRpcEndpoint schedulingRpcEndpoint = new SchedulingRpcEndpoint(akkaRpcService);
        schedulingRpcEndpoint.start();
        SchedulingRpcEndpointGateway schedulingRpcEndpointGateway = (SchedulingRpcEndpointGateway) schedulingRpcEndpoint.getSelfGateway(SchedulingRpcEndpointGateway.class);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        CompletableFuture<Void> completableFuture3 = new CompletableFuture<>();
        long nanoTime = System.nanoTime();
        schedulingRpcEndpointGateway.schedule(completableFuture, completableFuture2, completableFuture3);
        Assertions.assertThat((Long) completableFuture.thenApply(r3 -> {
            return Long.valueOf(System.nanoTime());
        }).get()).isGreaterThanOrEqualTo(nanoTime + Duration.ofMillis(20L).toNanos());
        Assertions.assertThat((Long) completableFuture2.thenApply(r32 -> {
            return Long.valueOf(System.nanoTime());
        }).get()).isGreaterThanOrEqualTo(nanoTime + Duration.ofMillis(20L).toNanos());
        completableFuture3.get();
    }
}
