package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/JobManagerRetriever.class */
public class JobManagerRetriever implements LeaderRetrievalListener {
    private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class);
    private final Object waitLock = new Object();
    private final WebMonitor webMonitor;
    private final ActorSystem actorSystem;
    private final FiniteDuration lookupTimeout;
    private final FiniteDuration timeout;
    private volatile Future<Tuple2<ActorGateway, Integer>> leaderGatewayPortFuture;

    public JobManagerRetriever(WebMonitor webMonitor, ActorSystem actorSystem, FiniteDuration finiteDuration, FiniteDuration finiteDuration2) {
        this.webMonitor = (WebMonitor) Preconditions.checkNotNull(webMonitor);
        this.actorSystem = (ActorSystem) Preconditions.checkNotNull(actorSystem);
        this.lookupTimeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration);
        this.timeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration2);
    }

    public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception {
        if (this.leaderGatewayPortFuture == null) {
            return Option.empty();
        }
        Future<Tuple2<ActorGateway, Integer>> future = this.leaderGatewayPortFuture;
        return future.isCompleted() ? Option.apply((Tuple2) Await.result(future, this.timeout)) : Option.empty();
    }

    public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
        Future<Tuple2<ActorGateway, Integer>> future = null;
        Deadline fromNow = this.timeout.fromNow();
        while (true) {
            if (!fromNow.isOverdue()) {
                synchronized (this.waitLock) {
                    future = this.leaderGatewayPortFuture;
                    if (future != null) {
                        break;
                    }
                    this.waitLock.wait(fromNow.timeLeft().toMillis());
                }
                break;
            }
            break;
        }
        if (future == null) {
            throw new TimeoutException("There is no JobManager available.");
        }
        return (Tuple2) Await.result(future, fromNow.timeLeft());
    }

    public void notifyLeaderAddress(String str, final UUID uuid) {
        if (str == null || str.equals("")) {
            return;
        }
        try {
            final Promise.DefaultPromise defaultPromise = new Promise.DefaultPromise();
            synchronized (this.waitLock) {
                this.leaderGatewayPortFuture = defaultPromise.future();
                this.waitLock.notifyAll();
            }
            LOG.info("New leader reachable under {}:{}.", str, uuid);
            AkkaUtils.getActorRefFuture(str, this.actorSystem, this.lookupTimeout).flatMap(new Mapper<ActorRef, Future<Tuple2<ActorGateway, Object>>>() { // from class: org.apache.flink.runtime.webmonitor.JobManagerRetriever.2
                public Future<Tuple2<ActorGateway, Object>> apply(ActorRef actorRef) {
                    AkkaActorGateway akkaActorGateway = new AkkaActorGateway(actorRef, uuid);
                    return Futures.successful(akkaActorGateway).zip(akkaActorGateway.ask(JobManagerMessages.getRequestWebMonitorPort(), JobManagerRetriever.this.timeout));
                }
            }, this.actorSystem.dispatcher()).onComplete(new OnComplete<Tuple2<ActorGateway, Object>>() { // from class: org.apache.flink.runtime.webmonitor.JobManagerRetriever.1
                public void onComplete(Throwable th, Tuple2<ActorGateway, Object> tuple2) throws Throwable {
                    if (th != null) {
                        JobManagerRetriever.LOG.warn("Failed to retrieve leader gateway and port.", th);
                        defaultPromise.failure(th);
                    } else if (!(tuple2._2() instanceof JobManagerMessages.ResponseWebMonitorPort)) {
                        defaultPromise.failure(new Exception("Received the message " + tuple2._2() + " as response to " + JobManagerMessages.getRequestWebMonitorPort() + ". But a message of type " + JobManagerMessages.ResponseWebMonitorPort.class + " was expected."));
                    } else {
                        defaultPromise.success(new Tuple2(tuple2._1(), Integer.valueOf(((JobManagerMessages.ResponseWebMonitorPort) tuple2._2()).port().intValue())));
                    }
                }
            }, this.actorSystem.dispatcher());
        } catch (Exception e) {
            handleError(e);
        }
    }

    public void handleError(Exception exc) {
        LOG.error("Received error from LeaderRetrievalService.", exc);
        try {
            this.webMonitor.stop();
        } catch (Exception e) {
            LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
        }
    }
}
