package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages$RequestBlobManagerPort$;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/client/JobClientActorTest.class */
public class JobClientActorTest extends TestLogger {
    private static ActorSystem system;
    private static JobGraph testJobGraph = new JobGraph("Test Job");
    private static Configuration clientConfig;

    /* loaded from: input_file:org/apache/flink/runtime/client/JobClientActorTest$JobAcceptingActor.class */
    public static class JobAcceptingActor extends FlinkUntypedActor {
        private final UUID leaderSessionID;
        private boolean jobAccepted = false;
        private ActorRef testFuture = ActorRef.noSender();

        public JobAcceptingActor(UUID uuid) {
            this.leaderSessionID = uuid;
        }

        protected void handleMessage(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.SubmitJob) {
                getSender().tell(new JobManagerMessages.JobSubmitSuccess(((JobManagerMessages.SubmitJob) obj).jobGraph().getJobID()), getSelf());
                this.jobAccepted = true;
                if (this.testFuture != ActorRef.noSender()) {
                    this.testFuture.tell(Acknowledge.get(), getSelf());
                    return;
                }
                return;
            }
            if (obj instanceof JobManagerMessages.RegisterJobClient) {
                getSender().tell(new JobManagerMessages.RegisterJobClientSuccess(((JobManagerMessages.RegisterJobClient) obj).jobID()), getSelf());
                this.jobAccepted = true;
                if (this.testFuture != ActorRef.noSender()) {
                    this.testFuture.tell(Acknowledge.get(), getSelf());
                    return;
                }
                return;
            }
            if (obj instanceof JobManagerMessages$RequestBlobManagerPort$) {
                getSender().tell(1337, getSelf());
            } else if (obj instanceof RegisterTest) {
                this.testFuture = getSender();
                if (this.jobAccepted) {
                    this.testFuture.tell(Acknowledge.get(), getSelf());
                }
            }
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionID;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/client/JobClientActorTest$PlainActor.class */
    public static class PlainActor extends FlinkUntypedActor {
        private final UUID leaderSessionID;

        public PlainActor(UUID uuid) {
            this.leaderSessionID = uuid;
        }

        protected void handleMessage(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages$RequestBlobManagerPort$) {
                getSender().tell(1337, getSelf());
            }
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionID;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/client/JobClientActorTest$RegisterTest.class */
    public static class RegisterTest {
    }

    @BeforeClass
    public static void setup() {
        clientConfig = new Configuration();
        system = AkkaUtils.createLocalActorSystem(clientConfig);
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
        system = null;
    }

    @Test(expected = JobClientActorSubmissionTimeoutException.class)
    public void testSubmissionTimeout() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration $times = finiteDuration.$times(2L);
        UUID randomUUID = UUID.randomUUID();
        Await.result(Patterns.ask(system.actorOf(JobSubmissionClientActor.createActorProps(new SettableLeaderRetrievalService(system.actorOf(Props.create(PlainActor.class, new Object[]{randomUUID})).path().toString(), randomUUID), finiteDuration, false, clientConfig)), new JobClientMessages.SubmitJobAndWait(testJobGraph), new Timeout($times)), $times);
    }

    @Test(expected = JobClientActorRegistrationTimeoutException.class)
    public void testRegistrationTimeout() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration $times = finiteDuration.$times(2L);
        UUID randomUUID = UUID.randomUUID();
        Await.result(Patterns.ask(system.actorOf(JobAttachmentClientActor.createActorProps(new SettableLeaderRetrievalService(system.actorOf(Props.create(PlainActor.class, new Object[]{randomUUID})).path().toString(), randomUUID), finiteDuration, false)), new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), new Timeout($times)), $times);
    }

    @Test(expected = JobClientActorConnectionTimeoutException.class)
    public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration $times = finiteDuration.$times(2L);
        Await.result(Patterns.ask(system.actorOf(JobSubmissionClientActor.createActorProps(new SettableLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID), finiteDuration, false, clientConfig)), new JobClientMessages.SubmitJobAndWait(testJobGraph), new Timeout($times)), $times);
    }

    @Test(expected = JobClientActorConnectionTimeoutException.class)
    public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration $times = finiteDuration.$times(2L);
        Await.result(Patterns.ask(system.actorOf(JobAttachmentClientActor.createActorProps(new SettableLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID), finiteDuration, false)), new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), new Timeout($times)), $times);
    }

    @Test(expected = JobClientActorConnectionTimeoutException.class)
    public void testConnectionTimeoutAfterJobSubmission() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration $times = finiteDuration.$times(2L);
        UUID randomUUID = UUID.randomUUID();
        ActorRef actorOf = system.actorOf(Props.create(JobAcceptingActor.class, new Object[]{randomUUID}));
        Future ask = Patterns.ask(system.actorOf(JobSubmissionClientActor.createActorProps(new SettableLeaderRetrievalService(actorOf.path().toString(), randomUUID), finiteDuration, false, clientConfig)), new JobClientMessages.SubmitJobAndWait(testJobGraph), new Timeout($times));
        Await.result(Patterns.ask(actorOf, new RegisterTest(), new Timeout($times)), $times);
        actorOf.tell(PoisonPill.getInstance(), ActorRef.noSender());
        Await.result(ask, $times);
    }

    @Test(expected = JobClientActorConnectionTimeoutException.class)
    public void testConnectionTimeoutAfterJobRegistration() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(1L, TimeUnit.SECONDS);
        FiniteDuration $times = finiteDuration.$times(2L);
        UUID randomUUID = UUID.randomUUID();
        ActorRef actorOf = system.actorOf(Props.create(JobAcceptingActor.class, new Object[]{randomUUID}));
        Future ask = Patterns.ask(system.actorOf(JobAttachmentClientActor.createActorProps(new SettableLeaderRetrievalService(actorOf.path().toString(), randomUUID), finiteDuration, false)), new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), new Timeout($times));
        Await.result(Patterns.ask(actorOf, new RegisterTest(), new Timeout($times)), $times);
        actorOf.tell(PoisonPill.getInstance(), ActorRef.noSender());
        Await.result(ask, $times);
    }

    @Test
    public void testGuaranteedAnswerIfJobClientDies() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(2L, TimeUnit.SECONDS);
        UUID randomUUID = UUID.randomUUID();
        ActorRef actorOf = system.actorOf(Props.create(JobAcceptingActor.class, new Object[]{randomUUID}));
        LeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(actorOf.path().toString(), randomUUID);
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, settableLeaderRetrievalService);
        JobListeningContext submitJob = JobClient.submitJob(system, clientConfig, testingHighAvailabilityServices, testJobGraph, finiteDuration, false, getClass().getClassLoader());
        Await.result(Patterns.ask(actorOf, new RegisterTest(), new Timeout(finiteDuration)), finiteDuration);
        submitJob.getJobClientActor().tell(PoisonPill.getInstance(), ActorRef.noSender());
        try {
            JobClient.awaitJobResult(submitJob);
            Assert.fail();
            testingHighAvailabilityServices.closeAndCleanupAllData();
        } catch (JobExecutionException e) {
            testingHighAvailabilityServices.closeAndCleanupAllData();
        } catch (Throwable th) {
            testingHighAvailabilityServices.closeAndCleanupAllData();
            throw th;
        }
    }
}
