/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

public class JobManagerRetriever
implements LeaderRetrievalListener {
    private static final Logger LOG = LoggerFactory.getLogger(JobManagerRetriever.class);
    private final Object waitLock = new Object();
    private final WebMonitor webMonitor;
    private final ActorSystem actorSystem;
    private final FiniteDuration lookupTimeout;
    private final FiniteDuration timeout;
    private volatile Future<Tuple2<ActorGateway, Integer>> leaderGatewayPortFuture;

    public JobManagerRetriever(WebMonitor webMonitor, ActorSystem actorSystem, FiniteDuration lookupTimeout, FiniteDuration timeout) {
        this.webMonitor = Preconditions.checkNotNull(webMonitor);
        this.actorSystem = Preconditions.checkNotNull(actorSystem);
        this.lookupTimeout = Preconditions.checkNotNull(lookupTimeout);
        this.timeout = Preconditions.checkNotNull(timeout);
    }

    public Option<Tuple2<ActorGateway, Integer>> getJobManagerGatewayAndWebPort() throws Exception {
        if (this.leaderGatewayPortFuture != null) {
            Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = this.leaderGatewayPortFuture;
            if (gatewayPortFuture.isCompleted()) {
                Tuple2 gatewayPort = (Tuple2)Await.result(gatewayPortFuture, (Duration)this.timeout);
                return Option.apply((Object)gatewayPort);
            }
            return Option.empty();
        }
        return Option.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Tuple2<ActorGateway, Integer> awaitJobManagerGatewayAndWebPort() throws Exception {
        Future<Tuple2<ActorGateway, Integer>> gatewayPortFuture = null;
        Deadline deadline = this.timeout.fromNow();
        while (!deadline.isOverdue()) {
            Object object = this.waitLock;
            synchronized (object) {
                gatewayPortFuture = this.leaderGatewayPortFuture;
                if (gatewayPortFuture != null) {
                    break;
                }
                this.waitLock.wait(deadline.timeLeft().toMillis());
            }
        }
        if (gatewayPortFuture == null) {
            throw new TimeoutException("There is no JobManager available.");
        }
        return (Tuple2)Await.result(gatewayPortFuture, (Duration)deadline.timeLeft());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) {
        if (leaderAddress != null && !leaderAddress.equals("")) {
            try {
                Promise.DefaultPromise leaderGatewayPortPromise = new Promise.DefaultPromise();
                Object object = this.waitLock;
                synchronized (object) {
                    this.leaderGatewayPortFuture = leaderGatewayPortPromise.future();
                    this.waitLock.notifyAll();
                }
                LOG.info("New leader reachable under {}:{}.", (Object)leaderAddress, (Object)leaderSessionID);
                AkkaUtils.getActorRefFuture((String)leaderAddress, (ActorSystem)this.actorSystem, (FiniteDuration)this.lookupTimeout).flatMap((Function1)new Mapper<ActorRef, Future<Tuple2<ActorGateway, Object>>>(){

                    public Future<Tuple2<ActorGateway, Object>> apply(ActorRef jobManagerRef) {
                        AkkaActorGateway leaderGateway = new AkkaActorGateway(jobManagerRef, leaderSessionID);
                        Future webMonitorPort = leaderGateway.ask(JobManagerMessages.getRequestWebMonitorPort(), JobManagerRetriever.this.timeout);
                        return Futures.successful((Object)leaderGateway).zip(webMonitorPort);
                    }
                }, (ExecutionContext)this.actorSystem.dispatcher()).onComplete((Function1)new OnComplete<Tuple2<ActorGateway, Object>>((Promise)leaderGatewayPortPromise){
                    final /* synthetic */ Promise val$leaderGatewayPortPromise;
                    {
                        this.val$leaderGatewayPortPromise = promise;
                    }

                    public void onComplete(Throwable failure, Tuple2<ActorGateway, Object> success) throws Throwable {
                        if (failure == null) {
                            if (success._2() instanceof JobManagerMessages.ResponseWebMonitorPort) {
                                int webMonitorPort = ((JobManagerMessages.ResponseWebMonitorPort)success._2()).port();
                                this.val$leaderGatewayPortPromise.success((Object)new Tuple2(success._1(), (Object)webMonitorPort));
                            } else {
                                this.val$leaderGatewayPortPromise.failure((Throwable)new Exception("Received the message " + success._2() + " as response to " + JobManagerMessages.getRequestWebMonitorPort() + ". But a message of type " + JobManagerMessages.ResponseWebMonitorPort.class + " was expected."));
                            }
                        } else {
                            LOG.warn("Failed to retrieve leader gateway and port.", failure);
                            this.val$leaderGatewayPortPromise.failure(failure);
                        }
                    }
                }, (ExecutionContext)this.actorSystem.dispatcher());
            }
            catch (Exception e) {
                this.handleError(e);
            }
        }
    }

    public void handleError(Exception exception) {
        LOG.error("Received error from LeaderRetrievalService.", (Throwable)exception);
        try {
            this.webMonitor.stop();
        }
        catch (Exception e) {
            LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", (Throwable)e);
        }
    }
}

