package org.apache.flink.mesos.runtime.clusterframework;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.util.MesosArtifactServer;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
import org.apache.flink.runtime.clusterframework.overlays.ContainerOverlay;
import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.NamedThreadFactory;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.util.Preconditions;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.class */
public class MesosApplicationMasterRunner {
    private static final int INIT_ERROR_EXIT_CODE = 31;
    private static final int ACTOR_DIED_EXIT_CODE = 32;
    protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
    private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    private static final Map<String, String> ENV = System.getenv();
    private static final Options ALL_OPTIONS = new Options().addOption(BootstrapTools.newDynamicPropertiesOption());

    public static void main(String[] strArr) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos AppMaster", strArr);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
        System.exit(new MesosApplicationMasterRunner().run(strArr));
    }

    protected int run(String[] strArr) {
        try {
            LOG.debug("All environment variables: {}", ENV);
            final Configuration parseDynamicProperties = BootstrapTools.parseDynamicProperties(new PosixParser().parse(ALL_OPTIONS, strArr));
            GlobalConfiguration.setDynamicProperties(parseDynamicProperties);
            final Configuration loadConfiguration = GlobalConfiguration.loadConfiguration();
            try {
                FileSystem.setDefaultScheme(loadConfiguration);
                SecurityUtils.install(new SecurityUtils.SecurityConfiguration(loadConfiguration));
                return ((Integer) SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { // from class: org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Integer call() throws Exception {
                        return Integer.valueOf(MesosApplicationMasterRunner.this.runPrivileged(loadConfiguration, parseDynamicProperties));
                    }
                })).intValue();
            } catch (IOException e) {
                throw new IOException("Error while setting the default filesystem scheme from configuration.", e);
            }
        } catch (Throwable th) {
            LOG.error("Mesos AppMaster initialization failed", th);
            return 31;
        }
    }

    protected int runPrivileged(Configuration configuration, Configuration configuration2) {
        ActorSystem actorSystem = null;
        WebMonitor webMonitor = null;
        MesosArtifactServer mesosArtifactServer = null;
        ExecutorService executorService = null;
        ExecutorService executorService2 = null;
        MesosServices mesosServices = null;
        try {
            String hostName = InetAddress.getLocalHost().getHostName();
            MesosConfiguration createMesosConfig = createMesosConfig(configuration, hostName);
            int numberCPUCores = Hardware.getNumberCPUCores();
            executorService = Executors.newFixedThreadPool(numberCPUCores, new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
            executorService2 = Executors.newFixedThreadPool(numberCPUCores, new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
            mesosServices = MesosServicesUtils.createMesosServices(configuration);
            MesosTaskManagerParameters create = MesosTaskManagerParameters.create(configuration);
            LOG.info("TaskManagers will be created with {} task slots", Integer.valueOf(create.containeredParameters().numSlots()));
            LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, JVM direct memory limit {} MB, {} cpus", new Object[]{Long.valueOf(create.containeredParameters().taskManagerTotalMemoryMB()), Long.valueOf(create.containeredParameters().taskManagerHeapSizeMB()), Long.valueOf(create.containeredParameters().taskManagerDirectMemoryLimitMB()), Double.valueOf(create.cpus())});
            int integer = configuration.getInteger("jobmanager.rpc.port", 6123);
            Preconditions.checkState(integer >= 0 && integer <= 65536, "Config parameter \"jobmanager.rpc.port\" is invalid, it must be between 0 and 65536");
            actorSystem = BootstrapTools.startActorSystem(configuration, hostName, integer, LOG);
            Address address = AkkaUtils.getAddress(actorSystem);
            String str = (String) address.host().get();
            int intValue = ((Integer) address.port().get()).intValue();
            LOG.info("Actor system bound to hostname {}.", str);
            LOG.debug("Starting Artifact Server");
            mesosArtifactServer = new MesosArtifactServer(UUID.randomUUID().toString(), str, configuration.getInteger("mesos.resourcemanager.artifactserver.port", 0), configuration);
            ContainerSpecification containerSpecification = new ContainerSpecification();
            containerSpecification.getDynamicConfiguration().addAll(configuration2);
            containerSpecification.getDynamicConfiguration().addAll(BootstrapTools.generateTaskManagerConfiguration(new Configuration(), str, intValue, create.containeredParameters().numSlots(), TASKMANAGER_REGISTRATION_TIMEOUT));
            applyOverlays(configuration, containerSpecification);
            configureArtifactServer(mesosArtifactServer, containerSpecification);
            LOG.debug("Starting JobManager actor");
            ActorRef actorRef = (ActorRef) JobManager.startJobManagerActors(configuration, actorSystem, executorService, executorService2, new Some(JobManager.JOB_MANAGER_NAME()), Option.empty(), getJobManagerClass(), getArchivistClass())._1();
            LOG.debug("Starting Web Frontend");
            webMonitor = BootstrapTools.startWebMonitorIfConfigured(configuration, actorSystem, actorRef, LOG);
            if (webMonitor != null) {
                createMesosConfig.frameworkInfo().setWebuiUrl(new URL("http", hostName, webMonitor.getServerPort(), "/").toExternalForm());
            }
            LOG.debug("Starting Mesos Flink Resource Manager");
            ActorRef actorOf = actorSystem.actorOf(MesosFlinkResourceManager.createActorProps(getResourceManagerClass(), configuration, createMesosConfig, mesosServices.createMesosWorkerStore(configuration, executorService2), LeaderRetrievalUtils.createLeaderRetrievalService(configuration, actorRef), create, containerSpecification, mesosArtifactServer, LOG), "Mesos_Resource_Master");
            LOG.debug("Starting process reapers for JobManager");
            actorSystem.actorOf(Props.create(ProcessReaper.class, new Object[]{actorOf, LOG, 32}), "Mesos_Resource_Master_Process_Reaper");
            actorSystem.actorOf(Props.create(ProcessReaper.class, new Object[]{actorRef, LOG, 32}), "JobManager_Process_Reaper");
            LOG.info("Mesos JobManager started");
            actorSystem.awaitTermination();
            if (webMonitor != null) {
                try {
                    webMonitor.stop();
                } catch (Throwable th) {
                    LOG.error("Failed to stop the web frontend", th);
                }
            }
            try {
                mesosArtifactServer.stop();
            } catch (Throwable th2) {
                LOG.error("Failed to stop the artifact server", th2);
            }
            org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS, new ExecutorService[]{executorService, executorService2});
            try {
                mesosServices.close(true);
                return 0;
            } catch (Throwable th3) {
                LOG.error("Failed to clean up and close MesosServices.", th3);
                return 0;
            }
        } catch (Throwable th4) {
            LOG.error("Mesos JobManager initialization failed", th4);
            if (webMonitor != null) {
                try {
                    webMonitor.stop();
                } catch (Throwable th5) {
                    LOG.warn("Failed to stop the web frontend", th5);
                }
            }
            if (mesosArtifactServer != null) {
                try {
                    mesosArtifactServer.stop();
                } catch (Throwable th6) {
                    LOG.error("Failed to stop the artifact server", th6);
                }
            }
            if (actorSystem != null) {
                try {
                    actorSystem.shutdown();
                } catch (Throwable th7) {
                    LOG.error("Error shutting down actor system", th7);
                }
            }
            if (executorService != null) {
                try {
                    executorService.shutdownNow();
                } catch (Throwable th8) {
                    LOG.error("Error shutting down future executor", th8);
                }
            }
            if (executorService2 != null) {
                try {
                    executorService2.shutdownNow();
                } catch (Throwable th9) {
                    LOG.error("Error shutting down io executor", th9);
                }
            }
            if (mesosServices == null) {
                return 31;
            }
            try {
                mesosServices.close(false);
                return 31;
            } catch (Throwable th10) {
                LOG.error("Error closing the mesos services.", th10);
                return 31;
            }
        }
    }

    protected Class<? extends MesosFlinkResourceManager> getResourceManagerClass() {
        return MesosFlinkResourceManager.class;
    }

    protected Class<? extends JobManager> getJobManagerClass() {
        return MesosJobManager.class;
    }

    protected Class<? extends MemoryArchivist> getArchivistClass() {
        return MemoryArchivist.class;
    }

    public static MesosConfiguration createMesosConfig(Configuration configuration, String str) {
        Protos.FrameworkInfo.Builder hostname = Protos.FrameworkInfo.newBuilder().setHostname(str);
        Protos.Credential.Builder builder = null;
        if (!configuration.containsKey("mesos.master")) {
            throw new IllegalConfigurationException("mesos.master must be configured.");
        }
        String string = configuration.getString("mesos.master", (String) null);
        hostname.setFailoverTimeout(FiniteDuration.apply(configuration.getInteger("mesos.failover-timeout", 600), TimeUnit.SECONDS).toSeconds());
        hostname.setName(configuration.getString("mesos.resourcemanager.framework.name", "Flink"));
        hostname.setRole(configuration.getString("mesos.resourcemanager.framework.role", "*"));
        hostname.setUser(configuration.getString("mesos.resourcemanager.framework.user", ""));
        if (configuration.containsKey("mesos.resourcemanager.framework.principal")) {
            hostname.setPrincipal(configuration.getString("mesos.resourcemanager.framework.principal", (String) null));
            builder = Protos.Credential.newBuilder();
            builder.setPrincipal(hostname.getPrincipal());
            if (configuration.containsKey("mesos.resourcemanager.framework.secret")) {
                builder.setSecret(configuration.getString("mesos.resourcemanager.framework.secret", (String) null));
            }
        }
        return new MesosConfiguration(string, hostname, Option.apply(builder));
    }

    private static void applyOverlays(Configuration configuration, ContainerSpecification containerSpecification) throws IOException {
        new CompositeContainerOverlay(new ContainerOverlay[]{FlinkDistributionOverlay.newBuilder().fromEnvironment(configuration).build(), HadoopConfOverlay.newBuilder().fromEnvironment(configuration).build(), HadoopUserOverlay.newBuilder().fromEnvironment(configuration).build(), KeytabOverlay.newBuilder().fromEnvironment(configuration).build(), Krb5ConfOverlay.newBuilder().fromEnvironment(configuration).build(), SSLStoreOverlay.newBuilder().fromEnvironment(configuration).build()}).configure(containerSpecification);
    }

    private static void configureArtifactServer(MesosArtifactServer mesosArtifactServer, ContainerSpecification containerSpecification) throws IOException {
        for (ContainerSpecification.Artifact artifact : containerSpecification.getArtifacts()) {
            mesosArtifactServer.addPath(artifact.source, artifact.dest);
        }
    }
}
