package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/client/JobClientActor.class */
public class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener {
    private final LeaderRetrievalService leaderRetrievalService;
    private final FiniteDuration timeout;
    private final boolean sysoutUpdates;
    private boolean jobSuccessfullySubmitted = false;
    private boolean terminated = false;
    private ActorRef jobManager;
    private UUID leaderSessionID;
    private ActorRef submitter;
    private JobGraph jobGraph;

    public JobClientActor(LeaderRetrievalService leaderRetrievalService, FiniteDuration finiteDuration, boolean z) {
        this.leaderRetrievalService = (LeaderRetrievalService) Preconditions.checkNotNull(leaderRetrievalService);
        this.timeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration);
        this.sysoutUpdates = z;
    }

    public void preStart() {
        try {
            this.leaderRetrievalService.start(this);
        } catch (Exception e) {
            this.LOG.error("Could not start the leader retrieval service.");
            throw new RuntimeException("Could not start the leader retrieval service.", e);
        }
    }

    public void postStop() {
        try {
            this.leaderRetrievalService.stop();
        } catch (Exception e) {
            this.LOG.warn("Could not properly stop the leader retrieval service.");
        }
    }

    @Override // org.apache.flink.runtime.akka.FlinkUntypedActor
    protected void handleMessage(Object obj) {
        if (obj instanceof ExecutionGraphMessages.ExecutionStateChanged) {
            logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) obj);
            return;
        }
        if (obj instanceof ExecutionGraphMessages.JobStatusChanged) {
            logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) obj);
            return;
        }
        if (obj instanceof JobClientMessages.JobManagerLeaderAddress) {
            JobClientMessages.JobManagerLeaderAddress jobManagerLeaderAddress = (JobClientMessages.JobManagerLeaderAddress) obj;
            disconnectFromJobManager();
            this.leaderSessionID = jobManagerLeaderAddress.leaderSessionID();
            if (jobManagerLeaderAddress.address() != null) {
                AkkaUtils.getActorRefFuture(jobManagerLeaderAddress.address(), getContext().system(), this.timeout).onSuccess(new OnSuccess<ActorRef>() { // from class: org.apache.flink.runtime.client.JobClientActor.1
                    public void onSuccess(ActorRef actorRef) throws Throwable {
                        JobClientActor.this.getSelf().tell(JobClientActor.this.decorateMessage(new JobClientMessages.JobManagerActorRef(actorRef)), ActorRef.noSender());
                    }
                }, getContext().dispatcher());
                return;
            }
            return;
        }
        if (obj instanceof JobClientMessages.JobManagerActorRef) {
            connectToJobManager(((JobClientMessages.JobManagerActorRef) obj).jobManager());
            if (this.jobGraph == null || this.jobSuccessfullySubmitted) {
                return;
            }
            tryToSubmitJob(this.jobGraph);
            return;
        }
        if (obj instanceof JobClientMessages.SubmitJobAndWait) {
            if (this.terminated) {
                String str = getClass().getName() + " is about to be terminated. Therefore, the job submission cannot be executed.";
                this.LOG.error(str);
                getSender().tell(decorateMessage(new Status.Failure(new Exception(str))), ActorRef.noSender());
                return;
            } else {
                if (this.submitter != null) {
                    this.LOG.error("Received repeated 'SubmitJobAndWait'");
                    getSender().tell(decorateMessage(new Status.Failure(new Exception("Received repeated 'SubmitJobAndWait'"))), ActorRef.noSender());
                    terminate();
                    return;
                }
                this.jobGraph = ((JobClientMessages.SubmitJobAndWait) obj).jobGraph();
                if (this.jobGraph == null) {
                    this.LOG.error("Received null JobGraph");
                    sender().tell(decorateMessage(new Status.Failure(new Exception("JobGraph is null"))), getSelf());
                    return;
                } else {
                    this.LOG.info("Received job {} ({}).", this.jobGraph.getName(), this.jobGraph.getJobID());
                    this.submitter = getSender();
                    tryToSubmitJob(this.jobGraph);
                    return;
                }
            }
        }
        if ((obj instanceof JobManagerMessages.JobResultSuccess) || (obj instanceof JobManagerMessages.JobResultFailure)) {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Received {} message from JobManager", obj.getClass().getSimpleName());
            }
            if (hasJobBeenSubmitted()) {
                this.submitter.tell(decorateMessage(obj), getSelf());
            }
            terminate();
            return;
        }
        if (obj instanceof JobManagerMessages.JobSubmitSuccess) {
            this.LOG.info("Job was successfully submitted to the JobManager {}.", getSender().path());
            this.jobSuccessfullySubmitted = true;
            return;
        }
        if (obj instanceof Terminated) {
            ActorRef actor = ((Terminated) obj).getActor();
            if (!this.jobManager.equals(actor)) {
                this.LOG.warn("Received 'Terminated' for unknown actor " + actor);
                return;
            }
            this.LOG.info("Lost connection to JobManager {}. Triggering connection timeout.", this.jobManager.path());
            disconnectFromJobManager();
            if (hasJobBeenSubmitted()) {
                getContext().system().scheduler().scheduleOnce(this.timeout, getSelf(), decorateMessage(JobClientMessages.getConnectionTimeout()), getContext().dispatcher(), ActorRef.noSender());
                return;
            }
            return;
        }
        if (JobClientMessages.getConnectionTimeout().equals(obj)) {
            if (isConnected()) {
                return;
            }
            if (hasJobBeenSubmitted()) {
                this.submitter.tell(decorateMessage(new Status.Failure(new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."))), getSelf());
            }
            terminate();
            return;
        }
        if (!JobClientMessages.getSubmissionTimeout().equals(obj)) {
            this.LOG.error("JobClient received unknown message: " + obj);
        } else {
            if (this.jobSuccessfullySubmitted) {
                return;
            }
            if (hasJobBeenSubmitted()) {
                this.submitter.tell(decorateMessage(new Status.Failure(new JobClientActorSubmissionTimeoutException("Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission."))), getSelf());
            }
            terminate();
        }
    }

    @Override // org.apache.flink.runtime.akka.FlinkUntypedActor
    protected UUID getLeaderSessionID() {
        return this.leaderSessionID;
    }

    private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged executionStateChanged) {
        this.LOG.info(executionStateChanged.toString());
        if (this.sysoutUpdates) {
            System.out.println(executionStateChanged.toString());
        }
    }

    private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged jobStatusChanged) {
        if (jobStatusChanged.newJobStatus() != JobStatus.FAILING || jobStatusChanged.error() == null) {
            this.LOG.info(jobStatusChanged.toString());
            if (this.sysoutUpdates) {
                System.out.println(jobStatusChanged.toString());
                return;
            }
            return;
        }
        this.LOG.info(jobStatusChanged.toString(), jobStatusChanged.error());
        if (this.sysoutUpdates) {
            System.out.println(jobStatusChanged.toString());
            jobStatusChanged.error().printStackTrace(System.out);
        }
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
    public void notifyLeaderAddress(String str, UUID uuid) {
        getSelf().tell(decorateMessage(new JobClientMessages.JobManagerLeaderAddress(str, uuid)), getSelf());
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
    public void handleError(Exception exc) {
        this.LOG.error("Error occurred in the LeaderRetrievalService.", exc);
        getSelf().tell(decorateMessage(PoisonPill.getInstance()), getSelf());
    }

    private void disconnectFromJobManager() {
        this.LOG.info("Disconnect from JobManager {}.", this.jobManager);
        if (this.jobManager != ActorRef.noSender()) {
            getContext().unwatch(this.jobManager);
            this.jobManager = ActorRef.noSender();
        }
    }

    private void connectToJobManager(ActorRef actorRef) {
        this.LOG.info("Connect to JobManager {}.", actorRef);
        if (actorRef != ActorRef.noSender()) {
            getContext().unwatch(actorRef);
        }
        this.LOG.info("Connected to new JobManager {}.", actorRef.path());
        this.jobManager = actorRef;
        getContext().watch(actorRef);
    }

    private void tryToSubmitJob(final JobGraph jobGraph) {
        this.jobGraph = jobGraph;
        if (isConnected()) {
            this.LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress", new Object[]{this.jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID()});
            Futures.future(new Callable<Object>() { // from class: org.apache.flink.runtime.client.JobClientActor.2
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    AkkaActorGateway akkaActorGateway = new AkkaActorGateway(JobClientActor.this.jobManager, JobClientActor.this.leaderSessionID);
                    JobClientActor.this.LOG.info("Upload jar files to job manager {}.", JobClientActor.this.jobManager.path());
                    try {
                        jobGraph.uploadUserJars(akkaActorGateway, JobClientActor.this.timeout);
                    } catch (IOException e) {
                        JobClientActor.this.getSelf().tell(JobClientActor.this.decorateMessage(new JobManagerMessages.JobResultFailure(new SerializedThrowable(new JobSubmissionException(jobGraph.getJobID(), "Could not upload the jar files to the job manager.", e)))), ActorRef.noSender());
                    }
                    JobClientActor.this.LOG.info("Submit job to the job manager {}.", JobClientActor.this.jobManager.path());
                    JobClientActor.this.jobManager.tell(JobClientActor.this.decorateMessage(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), JobClientActor.this.getSelf());
                    JobClientActor.this.getContext().system().scheduler().scheduleOnce(JobClientActor.this.timeout, JobClientActor.this.getSelf(), JobClientActor.this.decorateMessage(JobClientMessages.getSubmissionTimeout()), JobClientActor.this.getContext().dispatcher(), ActorRef.noSender());
                    return null;
                }
            }, getContext().dispatcher());
        } else {
            this.LOG.info("Could not submit job {} ({}), because there is no connection to a JobManager.", jobGraph.getName(), jobGraph.getJobID());
            getContext().system().scheduler().scheduleOnce(this.timeout, getSelf(), decorateMessage(JobClientMessages.getConnectionTimeout()), getContext().dispatcher(), ActorRef.noSender());
        }
    }

    private void terminate() {
        this.LOG.info("Terminate JobClientActor.");
        this.terminated = true;
        disconnectFromJobManager();
        getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
    }

    private boolean isConnected() {
        return this.jobManager != ActorRef.noSender();
    }

    private boolean hasJobBeenSubmitted() {
        return this.submitter != ActorRef.noSender();
    }

    public static Props createJobClientActorProps(LeaderRetrievalService leaderRetrievalService, FiniteDuration finiteDuration, boolean z) {
        return Props.create(JobClientActor.class, new Object[]{leaderRetrievalService, finiteDuration, Boolean.valueOf(z)});
    }
}
