package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeoutException;
import org.apache.camel.management.DefaultManagementAgent;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/client/JobClient.class */
public class JobClient {
    private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

    public static ActorSystem startJobClientActorSystem(Configuration configuration) throws IOException {
        LOG.info("Starting JobClient actor system");
        ActorSystem createActorSystem = AkkaUtils.createActorSystem(configuration, new Some(new Tuple2("", 0)));
        Address defaultAddress = createActorSystem.provider().getDefaultAddress();
        LOG.info("Started JobClient actor system at " + (defaultAddress.host().isDefined() ? defaultAddress.host().get() : NetUtils.UNKNOWN_HOST) + ':' + (defaultAddress.port().isDefined() ? ((Integer) defaultAddress.port().get()).intValue() : -1));
        return createActorSystem;
    }

    public static InetSocketAddress getJobManagerAddress(Configuration configuration) throws IOException {
        String string = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
        int integer = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
        if (string == null) {
            throw new RuntimeException("JobManager address has not been specified in the configuration.");
        }
        try {
            return new InetSocketAddress(InetAddress.getByName(string), integer);
        } catch (UnknownHostException e) {
            throw new IOException("Cannot resolve JobManager hostname " + string, e);
        }
    }

    public static SerializedJobExecutionResult submitJobAndWait(ActorSystem actorSystem, ActorRef actorRef, JobGraph jobGraph, FiniteDuration finiteDuration, boolean z) throws JobExecutionException {
        if (actorSystem == null || actorRef == null || jobGraph == null || finiteDuration == null) {
            throw new NullPointerException();
        }
        ActorRef actorOf = actorSystem.actorOf(Props.create((Class<?>) JobClientActor.class, actorRef, LOG, Boolean.valueOf(z)));
        try {
            try {
                Object result = Await.result(Patterns.ask(actorOf, new JobClientMessages.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT())), AkkaUtils.INF_TIMEOUT());
                if (!(result instanceof JobManagerMessages.JobResultSuccess)) {
                    if (result instanceof Status.Failure) {
                        throw ((Status.Failure) result).cause();
                    }
                    throw new Exception("Unknown answer after submitting the job: " + result);
                }
                LOG.info("Job execution complete");
                SerializedJobExecutionResult result2 = ((JobManagerMessages.JobResultSuccess) result).result();
                if (result2 != null) {
                    return result2;
                }
                throw new Exception("Job was successfully executed but result contained a null JobExecutionResult.");
            } catch (TimeoutException e) {
                throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. Job time exceeded " + AkkaUtils.INF_TIMEOUT(), e);
            } catch (JobExecutionException e2) {
                throw e2;
            } catch (Throwable th) {
                throw new JobExecutionException(jobGraph.getJobID(), "Communication with JobManager failed: " + th.getMessage(), th);
            }
        } finally {
            actorOf.tell(PoisonPill.getInstance(), ActorRef.noSender());
        }
    }

    public static void submitJobDetached(ActorRef actorRef, JobGraph jobGraph, FiniteDuration finiteDuration) throws JobExecutionException {
        if (actorRef == null || jobGraph == null || finiteDuration == null) {
            throw new NullPointerException();
        }
        try {
            Object result = Await.result(Patterns.ask(actorRef, new JobManagerMessages.SubmitJob(jobGraph, false), new Timeout(finiteDuration)), finiteDuration);
            if (!(result instanceof JobID)) {
                throw new Exception("Unexpected response: " + result);
            }
            JobID jobID = (JobID) result;
            if (!jobID.equals(jobGraph.getJobID())) {
                throw new Exception("JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() + ", response: " + jobID);
            }
        } catch (TimeoutException e) {
            throw new JobTimeoutException(jobGraph.getJobID(), "JobManager did not respond within " + finiteDuration.toString(), e);
        } catch (JobExecutionException e2) {
            throw e2;
        } catch (Throwable th) {
            throw new JobExecutionException(jobGraph.getJobID(), "Failed to send job to JobManager: " + th.getMessage(), th.getCause());
        }
    }

    public static void uploadJarFiles(JobGraph jobGraph, ActorRef actorRef, FiniteDuration finiteDuration) throws IOException {
        if (jobGraph.hasUsercodeJarFiles()) {
            try {
                Object result = Await.result(Patterns.ask(actorRef, JobManagerMessages.getRequestBlobManagerPort(), new Timeout(finiteDuration)), finiteDuration);
                if (!(result instanceof Integer)) {
                    throw new Exception("Expected port number (int) as answer, received " + result);
                }
                int intValue = ((Integer) result).intValue();
                Option<String> host = actorRef.path().address().host();
                jobGraph.uploadRequiredJarFiles(new InetSocketAddress(host.isDefined() ? host.get() : DefaultManagementAgent.DEFAULT_HOST, intValue));
            } catch (Exception e) {
                throw new IOException("Could not retrieve the JobManager's blob port.", e);
            }
        }
    }
}
