package org.apache.flink.runtime.testutils;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManagerMode;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/testutils/JobManagerProcess.class */
public class JobManagerProcess extends TestJvmProcess {
    private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
    private static final Pattern PORT_PATTERN = Pattern.compile(".*Starting JobManager at akka\\.tcp://flink@.*:(\\d+).*");
    private final int id;
    private final Configuration config;
    private final String[] jvmArgs;
    private int jobManagerPort;
    private ActorRef jobManagerRef;

    /* loaded from: input_file:org/apache/flink/runtime/testutils/JobManagerProcess$JobManagerProcessEntryPoint.class */
    public static class JobManagerProcessEntryPoint {
        private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcessEntryPoint.class);

        public static void main(String[] strArr) {
            try {
                Configuration configuration = ParameterTool.fromArgs(strArr).getConfiguration();
                LOG.info("Configuration: {}.", configuration);
                JobManager.runJobManager(configuration, JobManagerMode.CLUSTER, "localhost", 0);
                new CountDownLatch(1).await();
            } catch (Throwable th) {
                LOG.error("Failed to start JobManager process", th);
                System.exit(1);
            }
        }
    }

    public JobManagerProcess(int i, Configuration configuration) throws Exception {
        Preconditions.checkArgument(i >= 0, "Negative ID");
        this.id = i;
        this.config = (Configuration) Preconditions.checkNotNull(configuration, "Configuration");
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : configuration.toMap().entrySet()) {
            arrayList.add("--" + ((String) entry.getKey()));
            arrayList.add(entry.getValue());
        }
        this.jvmArgs = new String[arrayList.size()];
        arrayList.toArray(this.jvmArgs);
    }

    @Override // org.apache.flink.runtime.testutils.TestJvmProcess
    public String getName() {
        return "JobManager " + this.id;
    }

    @Override // org.apache.flink.runtime.testutils.TestJvmProcess
    public String[] getJvmArgs() {
        return this.jvmArgs;
    }

    @Override // org.apache.flink.runtime.testutils.TestJvmProcess
    public String getEntryPointClassName() {
        return JobManagerProcessEntryPoint.class.getName();
    }

    public Configuration getConfig() {
        return this.config;
    }

    public int getJobManagerPort(FiniteDuration finiteDuration) throws InterruptedException, NumberFormatException {
        if (this.jobManagerPort > 0) {
            return this.jobManagerPort;
        }
        Deadline fromNow = finiteDuration.fromNow();
        while (fromNow.hasTimeLeft()) {
            Matcher matcher = PORT_PATTERN.matcher(getProcessOutput());
            if (matcher.find()) {
                this.jobManagerPort = Integer.parseInt(matcher.group(1));
                return this.jobManagerPort;
            }
            Thread.sleep(100L);
        }
        throw new RuntimeException("Could not parse port from logs");
    }

    public String getJobManagerAkkaURL(FiniteDuration finiteDuration) throws InterruptedException, UnknownHostException {
        return AkkaRpcServiceUtils.getRpcUrl("localhost", getJobManagerPort(finiteDuration), "jobmanager", HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION, this.config);
    }

    public String toString() {
        return String.format("JobManagerProcess(id=%d, port=%d)", Integer.valueOf(this.id), Integer.valueOf(this.jobManagerPort));
    }

    public ActorRef getActorRef(ActorSystem actorSystem, FiniteDuration finiteDuration) throws Exception {
        if (this.jobManagerRef != null) {
            return this.jobManagerRef;
        }
        Preconditions.checkNotNull(actorSystem, "Actor system");
        Deadline fromNow = finiteDuration.fromNow();
        while (fromNow.hasTimeLeft()) {
            try {
                this.jobManagerRef = AkkaUtils.getActorRef(getJobManagerAkkaURL(fromNow.timeLeft()), actorSystem, fromNow.timeLeft());
                return this.jobManagerRef;
            } catch (Throwable th) {
                Thread.sleep(Math.min(100L, fromNow.timeLeft().toMillis()));
            }
        }
        throw new IllegalStateException("JobManager did not start up within " + finiteDuration + ".");
    }
}
