/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.testutils;

import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobManagerCommunicationUtils {
    private static final FiniteDuration askTimeout = new FiniteDuration(30L, TimeUnit.SECONDS);

    public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception {
        Future listResponse;
        Object result;
        List jobs;
        while (!(jobs = ((JobManagerMessages.RunningJobsStatus)(result = Await.result((Awaitable)(listResponse = jobManager.ask(JobManagerMessages.getRequestRunningJobsStatus(), askTimeout)), (Duration)askTimeout))).getStatusMessages()).isEmpty()) {
            Thread.sleep(50L);
        }
        return;
    }

    public static void waitUntilJobIsRunning(ActorGateway jobManager, String name) throws Exception {
        while (true) {
            List jobs;
            Future listResponse = jobManager.ask(JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
            try {
                Object result = Await.result((Awaitable)listResponse, (Duration)askTimeout);
                jobs = ((JobManagerMessages.RunningJobsStatus)result).getStatusMessages();
            }
            catch (Exception e) {
                throw new Exception("Could not wait for job to start - failed to retrieve running jobs from the JobManager.", e);
            }
            for (JobStatusMessage job : jobs) {
                if (!job.getJobName().equals(name)) continue;
                return;
            }
            Thread.sleep(50L);
        }
    }

    public static void cancelCurrentJob(ActorGateway jobManager) throws Exception {
        JobManagerCommunicationUtils.cancelCurrentJob(jobManager, null);
    }

    public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception {
        JobStatusMessage status = null;
        for (int i = 0; i < 200; ++i) {
            List jobs;
            Future listResponse = jobManager.ask(JobManagerMessages.getRequestRunningJobsStatus(), askTimeout);
            try {
                Object result = Await.result((Awaitable)listResponse, (Duration)askTimeout);
                jobs = ((JobManagerMessages.RunningJobsStatus)result).getStatusMessages();
            }
            catch (Exception e) {
                throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e);
            }
            if (jobs.isEmpty()) {
                Thread.sleep(50L);
                continue;
            }
            if (jobs.size() == 1) {
                status = (JobStatusMessage)jobs.get(0);
                continue;
            }
            if (name != null) {
                for (JobStatusMessage msg : jobs) {
                    if (!msg.getJobName().equals(name)) continue;
                    status = msg;
                }
                if (status != null) continue;
                throw new Exception("Could not cancel job - no job matched expected name = '" + name + "' in " + jobs);
            }
            String jobNames = "";
            for (JobStatusMessage jsm : jobs) {
                jobNames = jobNames + jsm.getJobName() + ", ";
            }
            throw new Exception("Could not cancel job - more than one running job: " + jobNames);
        }
        if (status == null) {
            throw new Exception("Could not cancel job - no running jobs");
        }
        if (status.getJobState().isGloballyTerminalState()) {
            throw new Exception("Could not cancel job - job is not running any more");
        }
        JobID jobId = status.getJobId();
        Future response = jobManager.ask((Object)new JobManagerMessages.CancelJob(jobId), askTimeout);
        try {
            Await.result((Awaitable)response, (Duration)askTimeout);
        }
        catch (Exception e) {
            throw new Exception("Sending the 'cancel' message failed.", e);
        }
    }
}

