package org.apache.flink.runtime.rpc.akka;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest.class */
public class AkkaActorSystemTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest$Fail.class */
    private static final class Fail {
        private final Throwable errorCause;

        private Fail(Throwable th) {
            this.errorCause = th;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Throwable getErrorCause() {
            return this.errorCause;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static Fail exceptionally(Throwable th) {
            return new Fail(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaActorSystemTest$SimpleActor.class */
    private static final class SimpleActor extends AbstractActor {
        private SimpleActor() {
        }

        public AbstractActor.Receive createReceive() {
            return ReceiveBuilder.create().match(Fail.class, this::handleFail).build();
        }

        private void handleFail(Fail fail) {
            throw new RuntimeException(fail.getErrorCause());
        }
    }

    @Test
    public void shutsDownOnActorFailure() {
        ActorSystem createLocalActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
        try {
            CompletableFuture completableFuture = createLocalActorSystem.getWhenTerminated().toCompletableFuture();
            createLocalActorSystem.actorOf(Props.create(SimpleActor.class, new Object[0])).tell(Fail.exceptionally(new FlinkException("Flink test exception")), ActorRef.noSender());
            completableFuture.join();
            AkkaUtils.terminateActorSystem(createLocalActorSystem).join();
        } catch (Throwable th) {
            AkkaUtils.terminateActorSystem(createLocalActorSystem).join();
            throw th;
        }
    }

    @Test
    public void askTerminatedActorFailsWithRecipientTerminatedException() {
        ActorSystem createLocalActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
        Duration ofSeconds = Duration.ofSeconds(10L);
        try {
            ActorRef actorOf = createLocalActorSystem.actorOf(Props.create(SimpleActor.class, new Object[0]));
            Patterns.gracefulStop(actorOf, ofSeconds).toCompletableFuture().join();
            try {
                Patterns.ask(actorOf, new Object(), ofSeconds).toCompletableFuture().get();
                Assert.fail("Expected a recipient terminated exception.");
            } catch (Exception e) {
                Assert.assertTrue(AkkaRpcServiceUtils.isRecipientTerminatedException(ExceptionUtils.stripExecutionException(e)));
            }
        } finally {
            AkkaUtils.terminateActorSystem(createLocalActorSystem).join();
        }
    }
}
