package org.apache.flink.runtime.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import java.util.UUID;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/akka/FlinkUntypedActorTest.class */
public class FlinkUntypedActorTest {
    private static ActorSystem actorSystem;

    /* loaded from: input_file:org/apache/flink/runtime/akka/FlinkUntypedActorTest$PlainFlinkUntypedActor.class */
    static class PlainFlinkUntypedActor extends FlinkUntypedActor {
        private UUID leaderSessionID;
        private int messageCounter = 0;

        public PlainFlinkUntypedActor(UUID uuid) {
            this.leaderSessionID = uuid;
        }

        protected void handleMessage(Object obj) throws Exception {
            this.messageCounter++;
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionID;
        }

        public int getMessageCounter() {
            return this.messageCounter;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/akka/FlinkUntypedActorTest$PlainRequiresLeaderSessionID.class */
    static class PlainRequiresLeaderSessionID implements RequiresLeaderSessionID {
        public String toString() {
            return "PlainRequiresLeaderSessionID";
        }
    }

    @BeforeClass
    public static void setup() {
        actorSystem = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(actorSystem);
    }

    @Test
    public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        TestActorRef testActorRef = null;
        try {
            testActorRef = TestActorRef.create(actorSystem, Props.create(PlainFlinkUntypedActor.class, new Object[]{randomUUID}));
            PlainFlinkUntypedActor underlyingActor = testActorRef.underlyingActor();
            testActorRef.tell(new JobManagerMessages.LeaderSessionMessage(randomUUID, 1), ActorRef.noSender());
            testActorRef.tell(new JobManagerMessages.LeaderSessionMessage(randomUUID2, 2), ActorRef.noSender());
            testActorRef.tell(new JobManagerMessages.LeaderSessionMessage(randomUUID, 2), ActorRef.noSender());
            testActorRef.tell(1, ActorRef.noSender());
            Assert.assertEquals(3L, underlyingActor.getMessageCounter());
            stopActor(testActorRef);
        } catch (Throwable th) {
            stopActor(testActorRef);
            throw th;
        }
    }

    @Test
    public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() {
        UUID randomUUID = UUID.randomUUID();
        TestActorRef testActorRef = null;
        try {
            testActorRef = TestActorRef.create(actorSystem, Props.create(PlainFlinkUntypedActor.class, new Object[]{randomUUID}));
            testActorRef.receive(new JobManagerMessages.LeaderSessionMessage(randomUUID, 1));
            try {
                testActorRef.receive(new PlainRequiresLeaderSessionID());
                Assert.fail("Expected an exception to be thrown, because a RequiresLeaderSessionIDmessage was sent without being wrapped in LeaderSessionMessage.");
            } catch (Exception e) {
                Assert.assertEquals("Received a message PlainRequiresLeaderSessionID without a leader session ID, even though the message requires a leader session ID.", e.getMessage());
            }
            stopActor(testActorRef);
        } catch (Throwable th) {
            stopActor(testActorRef);
            throw th;
        }
    }

    private static void stopActor(ActorRef actorRef) {
        if (actorRef != null) {
            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
        }
    }
}
