package org.apache.flink.client.program;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.UntypedActor;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import scala.Some;
import scala.Tuple2;

@PrepareForTest({Client.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/flink/client/program/ClientTest.class */
public class ClientTest {
    private PackagedProgram program;
    private Optimizer compilerMock;
    private JobGraphGenerator generatorMock;
    private Configuration config;
    private ActorSystem jobManagerSystem;
    private JobGraph jobGraph = new JobGraph("test graph");

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$FailureReturningActor.class */
    public static class FailureReturningActor extends UntypedActor {
        public void onReceive(Object obj) throws Exception {
            getSender().tell(new Status.Failure(new Exception("test")), getSelf());
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/ClientTest$SuccessReturningActor.class */
    public static class SuccessReturningActor extends UntypedActor {
        public void onReceive(Object obj) throws Exception {
            if (!(obj instanceof JobManagerMessages.SubmitJob)) {
                getSender().tell(new Status.Failure(new Exception("Unknown message " + obj)), getSelf());
            } else {
                getSender().tell(new Status.Success(((JobManagerMessages.SubmitJob) obj).jobGraph().getJobID()), getSelf());
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        int availablePort = NetUtils.getAvailablePort();
        this.config = new Configuration();
        this.config.setString("jobmanager.rpc.address", "localhost");
        this.config.setInteger("jobmanager.rpc.port", availablePort);
        this.config.setString("akka.ask.timeout", ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
        this.program = (PackagedProgram) Mockito.mock(PackagedProgram.class);
        this.compilerMock = (Optimizer) Mockito.mock(Optimizer.class);
        this.generatorMock = (JobGraphGenerator) Mockito.mock(JobGraphGenerator.class);
        JobWithJars jobWithJars = (JobWithJars) Mockito.mock(JobWithJars.class);
        Plan plan = (Plan) Mockito.mock(Plan.class);
        OptimizedPlan optimizedPlan = (OptimizedPlan) Mockito.mock(OptimizedPlan.class);
        Mockito.when(plan.getJobName()).thenReturn("MockPlan");
        Mockito.when(this.program.getPlanWithJars()).thenReturn(jobWithJars);
        Mockito.when(jobWithJars.getPlan()).thenReturn(plan);
        PowerMockito.whenNew(Optimizer.class).withArguments(Matchers.any(DataStatistics.class), new Object[]{Matchers.any(CostEstimator.class), Matchers.any(Configuration.class)}).thenReturn(this.compilerMock);
        Mockito.when(this.compilerMock.compile(plan)).thenReturn(optimizedPlan);
        PowerMockito.whenNew(JobGraphGenerator.class).withNoArguments().thenReturn(this.generatorMock);
        Mockito.when(this.generatorMock.compileJobGraph(optimizedPlan)).thenReturn(this.jobGraph);
        try {
            this.jobManagerSystem = AkkaUtils.createActorSystem(this.config, new Some(new Tuple2("localhost", Integer.valueOf(availablePort))));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Setup of test actor system failed.");
        }
    }

    @After
    public void shutDownActorSystem() {
        if (this.jobManagerSystem != null) {
            try {
                this.jobManagerSystem.shutdown();
                this.jobManagerSystem.awaitTermination();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
            }
        }
    }

    @Test
    public void shouldSubmitToJobClient() {
        try {
            this.jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class, new Object[0]), JobManager.JOB_MANAGER_NAME());
            Assert.assertNotNull(new Client(this.config, getClass().getClassLoader()).run(this.program.getPlanWithJars(), -1, false));
            this.program.deleteExtractedLibraries();
            ((Optimizer) Mockito.verify(this.compilerMock, Mockito.times(1))).compile((Plan) Matchers.any(Plan.class));
            ((JobGraphGenerator) Mockito.verify(this.generatorMock, Mockito.times(1))).compileJobGraph((OptimizedPlan) Matchers.any(OptimizedPlan.class));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void shouldSubmitToJobClientFails() {
        try {
            this.jobManagerSystem.actorOf(Props.create(FailureReturningActor.class, new Object[0]), JobManager.JOB_MANAGER_NAME());
            try {
                new Client(this.config, getClass().getClassLoader()).run(this.program.getPlanWithJars(), -1, false);
                Assert.fail("This should fail with an exception");
            } catch (ProgramInvocationException e) {
            } catch (Exception e2) {
                Assert.fail("wrong exception " + e2);
            }
            ((Optimizer) Mockito.verify(this.compilerMock, Mockito.times(1))).compile((Plan) Matchers.any(Plan.class));
            ((JobGraphGenerator) Mockito.verify(this.generatorMock, Mockito.times(1))).compileJobGraph((OptimizedPlan) Matchers.any(OptimizedPlan.class));
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void tryLocalExecution() {
        try {
            PackagedProgram packagedProgram = (PackagedProgram) Mockito.mock(PackagedProgram.class);
            Mockito.when(Boolean.valueOf(packagedProgram.isUsingInteractiveMode())).thenReturn(true);
            ((PackagedProgram) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.client.program.ClientTest.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m0answer(InvocationOnMock invocationOnMock) throws Throwable {
                    ExecutionEnvironment.createLocalEnvironment();
                    return null;
                }
            }).when(packagedProgram)).invokeInteractiveModeForExecution();
            try {
                new Client(this.config, getClass().getClassLoader()).run(packagedProgram, 1, true);
                Assert.fail("Creating the local execution environment should not be possible");
            } catch (InvalidProgramException e) {
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }
}
