package org.apache.flink.yarn;

import java.io.File;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/YarnTaskExecutorRunner.class */
public class YarnTaskExecutorRunner {
    protected static final Logger LOG = LoggerFactory.getLogger(YarnTaskExecutorRunner.class);
    private static final Map<String, String> ENV = System.getenv();
    private static final int INIT_ERROR_EXIT_CODE = 31;
    private MetricRegistry metricRegistry;
    private HighAvailabilityServices haServices;
    private RpcService taskExecutorRpcService;
    private TaskManagerRunner taskManagerRunner;

    public static void main(String[] strArr) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", strArr);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
        System.exit(new YarnTaskExecutorRunner().run(strArr));
    }

    protected int run(String[] strArr) {
        try {
            LOG.debug("All environment variables: {}", ENV);
            String str = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
            String str2 = ENV.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
            LOG.info("Current working/local Directory: {}", str2);
            String str3 = ENV.get(ApplicationConstants.Environment.PWD.key());
            LOG.info("Current working Directory: {}", str3);
            String str4 = ENV.get(YarnConfigKeys.KEYTAB_PATH);
            LOG.info("TM: remote keytab path obtained {}", str4);
            String str5 = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
            LOG.info("TM: remote keytab principal obtained {}", str5);
            final Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(str3);
            FileSystem.setDefaultScheme(loadConfiguration);
            String string = loadConfiguration.getString("taskmanager.tmp.dirs", (String) null);
            if (string == null) {
                LOG.info("Setting directories for temporary file " + str2);
                loadConfiguration.setString("taskmanager.tmp.dirs", str2);
            } else {
                LOG.info("Overriding YARN's temporary file directories with those specified in the Flink config: " + string);
            }
            loadConfiguration.setBoolean("akka.jvm-exit-on-fatal-error", true);
            String str6 = null;
            if (str4 != null) {
                str6 = new File(str3, Utils.KEYTAB_FILE_NAME).getAbsolutePath();
                LOG.info("keytab path: {}", str6);
            }
            LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", UserGroupInformation.getCurrentUser().getShortUserName(), str);
            org.apache.hadoop.conf.Configuration configuration = null;
            File file = new File(str3, Utils.KRB5_FILE_NAME);
            if (file.exists() && file.canRead()) {
                LOG.info("KRB5 Conf: {}", file.getAbsolutePath());
                configuration = new org.apache.hadoop.conf.Configuration();
                configuration.set("hadoop.security.authentication", "kerberos");
                configuration.set("hadoop.security.authorization", "true");
            }
            SecurityUtils.SecurityConfiguration securityConfiguration = configuration != null ? new SecurityUtils.SecurityConfiguration(loadConfiguration, configuration) : new SecurityUtils.SecurityConfiguration(loadConfiguration);
            if (str6 != null && str5 != null) {
                loadConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, str6);
                loadConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, str5);
            }
            SecurityUtils.install(securityConfiguration);
            return ((Integer) SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { // from class: org.apache.flink.yarn.YarnTaskExecutorRunner.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Integer call() throws Exception {
                    return Integer.valueOf(YarnTaskExecutorRunner.this.runTaskExecutor(loadConfiguration));
                }
            })).intValue();
        } catch (Throwable th) {
            LOG.error("YARN Application Master initialization failed", th);
            return 31;
        }
    }

    protected int runTaskExecutor(Configuration configuration) {
        try {
            String str = ENV.get("_FLINK_CONTAINER_ID");
            Preconditions.checkArgument(str != null, "ContainerId variable %s not set", new Object[]{"_FLINK_CONTAINER_ID"});
            String str2 = ENV.get("_FLINK_NODE_ID");
            if (str2 != null) {
                configuration.setString("taskmanager.hostname", str2);
            }
            ResourceID resourceID = new ResourceID(str);
            LOG.info("YARN assigned resource id {} for the task executor.", resourceID.toString());
            this.taskExecutorRpcService = TaskManagerRunner.createRpcService(configuration, this.haServices);
            this.haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, this.taskExecutorRpcService.getExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
            HeartbeatServices fromConfiguration = HeartbeatServices.fromConfiguration(configuration);
            this.metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
            this.taskManagerRunner = new TaskManagerRunner(configuration, resourceID, this.taskExecutorRpcService, this.haServices, fromConfiguration, this.metricRegistry);
            this.taskManagerRunner.start();
            LOG.debug("YARN task executor started");
            this.taskManagerRunner.getTerminationFuture().get();
            LOG.info("YARN task manager runner finished");
            shutdown();
            return 0;
        } catch (Throwable th) {
            LOG.error("YARN task executor initialization failed", th);
            shutdown();
            return 31;
        }
    }

    protected void shutdown() {
        if (this.taskExecutorRpcService != null) {
            try {
                this.taskExecutorRpcService.stopService();
            } catch (Throwable th) {
                LOG.error("Error shutting down job master rpc service", th);
            }
        }
        if (this.haServices != null) {
            try {
                this.haServices.close();
            } catch (Throwable th2) {
                LOG.warn("Failed to stop the HA service", th2);
            }
        }
        if (this.metricRegistry != null) {
            try {
                this.metricRegistry.shutdown();
            } catch (Throwable th3) {
                LOG.warn("Failed to stop the metrics registry", th3);
            }
        }
    }
}
