package org.apache.flink.yarn.appMaster;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.yarn.Client;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.rpc.ApplicationMasterStatus;
import org.apache.flink.yarn.rpc.YARNClientMasterProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/appMaster/ApplicationMaster.class */
public class ApplicationMaster implements YARNClientMasterProtocol {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationMaster.class);
    private final String currDir;
    private final String logDirs;
    private final String ownHostname;
    private final String appId;
    private final int appNumber;
    private final String clientHomeDir;
    private final String applicationMasterHost;
    private final String remoteFlinkJarPath;
    private final String shipListString;
    private final String yarnClientUsername;
    private final String rpcPort;
    private final int taskManagerCount;
    private final int memoryPerTaskManager;
    private final int coresPerTaskManager;
    private final int slots;
    private final String localWebInterfaceDir;
    private final Configuration conf;
    private FileSystem fs;
    private JobManager jobManager;
    private final RPC.Server amRpcServer;
    private AMRMClient<AMRMClient.ContainerRequest> rmClient;
    private NMClient nmClient;
    private boolean hasLogback;
    private boolean hasLog4j;
    private int heapLimit;
    Map<String, LocalResource> taskManagerLocalResources;
    private String dynamicPropertiesEncodedString;
    private ApplicationMasterStatus amStatus;
    private final int jobManagerPort;
    private final int jobManagerWebPort;
    private List<YARNClientMasterProtocol.Message> messages = new SerializableArrayList();
    private int completedContainers = 0;
    private Boolean isFailed = false;
    private boolean isClosed = false;

    public ApplicationMaster(Configuration configuration) throws IOException {
        this.fs = FileSystem.get(configuration);
        Map<String, String> map = System.getenv();
        this.currDir = map.get(ApplicationConstants.Environment.PWD.key());
        this.logDirs = map.get(ApplicationConstants.Environment.LOG_DIRS.key());
        this.ownHostname = map.get(ApplicationConstants.Environment.NM_HOST.key());
        this.appId = map.get(Client.ENV_APP_ID);
        this.appNumber = Integer.valueOf(map.get(Client.ENV_APP_NUMBER)).intValue();
        this.clientHomeDir = map.get(Client.ENV_CLIENT_HOME_DIR);
        this.applicationMasterHost = map.get(ApplicationConstants.Environment.NM_HOST.key());
        this.remoteFlinkJarPath = map.get(Client.FLINK_JAR_PATH);
        this.shipListString = map.get(Client.ENV_CLIENT_SHIP_FILES);
        this.yarnClientUsername = map.get(Client.ENV_CLIENT_USERNAME);
        this.rpcPort = map.get(Client.ENV_AM_PRC_PORT);
        this.taskManagerCount = Integer.valueOf(map.get(Client.ENV_TM_COUNT)).intValue();
        this.memoryPerTaskManager = Integer.valueOf(map.get(Client.ENV_TM_MEMORY)).intValue();
        this.coresPerTaskManager = Integer.valueOf(map.get(Client.ENV_TM_CORES)).intValue();
        this.slots = Integer.valueOf(map.get(Client.ENV_SLOTS)).intValue();
        this.dynamicPropertiesEncodedString = map.get(Client.ENV_DYNAMIC_PROPERTIES);
        this.localWebInterfaceDir = this.currDir + "/resources/web-docs-infoserver";
        this.conf = configuration;
        if (this.currDir == null) {
            throw new RuntimeException("Current directory unknown");
        }
        if (this.ownHostname == null) {
            throw new RuntimeException("Own hostname (" + ApplicationConstants.Environment.NM_HOST + ") not set.");
        }
        LOG.debug("Working directory " + this.currDir);
        Utils.getFlinkConfiguration(this.currDir);
        this.amRpcServer = RPC.getServer(this, this.ownHostname, Integer.valueOf(this.rpcPort).intValue(), 2);
        this.amRpcServer.start();
        int integer = GlobalConfiguration.getInteger("jobmanager.rpc.port", -1);
        if (integer != -1) {
            integer = Utils.offsetPort(integer, this.appNumber);
        } else {
            LOG.warn("JobManager port is unknown");
        }
        this.jobManagerPort = integer;
        this.jobManagerWebPort = Utils.offsetPort(GlobalConfiguration.getInteger("jobmanager.web.port", 8081), this.appNumber);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setFailed(boolean z) {
        this.isFailed = Boolean.valueOf(z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void generateConfigurationFile() throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(this.currDir + "/flink-conf.yaml")));
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(this.currDir + "/flink-conf-modified.yaml"));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                break;
            }
            if (readLine.contains("jobmanager.rpc.address")) {
                bufferedWriter.append((CharSequence) ("jobmanager.rpc.address: " + this.ownHostname + "\n"));
            } else if (readLine.contains("jobmanager.web.rootpath")) {
                bufferedWriter.append((CharSequence) "jobmanager.web.rootpath: \n");
            } else {
                bufferedWriter.append((CharSequence) (readLine + "\n"));
            }
        }
        bufferedWriter.append((CharSequence) ("jobmanager.rpc.address: " + this.ownHostname + "\n"));
        bufferedWriter.append((CharSequence) ("jobmanager.rpc.port: " + this.jobManagerPort + "\n"));
        bufferedWriter.append((CharSequence) ("jobmanager.web.rootpath: " + this.localWebInterfaceDir + "\n"));
        bufferedWriter.append((CharSequence) ("jobmanager.web.logpath: " + this.logDirs + "\n"));
        bufferedWriter.append((CharSequence) ("jobmanager.web.port: " + this.jobManagerWebPort + "\n"));
        if (this.slots != -1) {
            bufferedWriter.append((CharSequence) ("taskmanager.numberOfTaskSlots: " + this.slots + "\n"));
            bufferedWriter.append((CharSequence) ("parallelization.degree.default: " + (this.slots * this.taskManagerCount) + "\n"));
        }
        for (Tuple2 tuple2 : CliFrontend.getDynamicProperties(this.dynamicPropertiesEncodedString)) {
            String str = ((String) tuple2.f0) + ": " + ((String) tuple2.f1);
            bufferedWriter.append((CharSequence) (str + "\n"));
            LOG.debug("Adding user-supplied configuration value to generated configuration file: " + str);
        }
        bufferedWriter.close();
        bufferedReader.close();
        if (new File(this.currDir + "/flink-conf-modified.yaml").exists()) {
            return;
        }
        LOG.warn("modified yaml does not exist!");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startJobManager() throws Exception {
        Utils.copyJarContents("resources/web-docs-infoserver", ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        this.jobManager = JobManager.initialize(new String[]{"-executionMode", "cluster", "-configDir", this.currDir + "/flink-conf-modified.yaml"});
        this.jobManager.startInfoServer();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setRMClient(AMRMClient<AMRMClient.ContainerRequest> aMRMClient) {
        this.rmClient = aMRMClient;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void run() throws Exception {
        this.heapLimit = Utils.calculateHeapSize(this.memoryPerTaskManager);
        this.nmClient = NMClient.createNMClient();
        this.nmClient.init(this.conf);
        this.nmClient.start();
        this.nmClient.cleanupRunningContainersOnStop(true);
        String str = "http://" + this.applicationMasterHost + ":" + this.jobManagerWebPort;
        LOG.info("Registering ApplicationMaster with tracking url " + str);
        this.rmClient.registerApplicationMaster(this.applicationMasterHost, 0, str);
        Priority priority = (Priority) Records.newRecord(Priority.class);
        priority.setPriority(0);
        Resource resource = (Resource) Records.newRecord(Resource.class);
        resource.setMemory(this.memoryPerTaskManager);
        resource.setVirtualCores(this.coresPerTaskManager);
        for (int i = 0; i < this.taskManagerCount; i++) {
            AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(resource, (String[]) null, (String[]) null, priority);
            LOG.info("Requesting TaskManager container " + i);
            this.rmClient.addContainerRequest(containerRequest);
        }
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        LocalResource localResource2 = (LocalResource) Records.newRecord(LocalResource.class);
        Utils.registerLocalResource(this.fs, new Path(this.remoteFlinkJarPath), localResource);
        Utils.setupLocalResource(this.conf, this.fs, this.appId, new Path("file://" + this.currDir + "/flink-conf-modified.yaml"), localResource2, new Path(this.clientHomeDir));
        LOG.info("Prepared local resource for modified yaml: " + localResource2);
        this.hasLogback = new File(this.currDir + "/logback.xml").exists();
        this.hasLog4j = new File(this.currDir + "/log4j.properties").exists();
        LocalResource[] localResourceArr = null;
        String[] split = this.shipListString.split(",");
        if (!this.shipListString.isEmpty()) {
            localResourceArr = new LocalResource[split.length];
            int i2 = 0;
            for (String str2 : split) {
                if (str2 != null && !str2.isEmpty()) {
                    localResourceArr[i2] = (LocalResource) Records.newRecord(LocalResource.class);
                    Utils.registerLocalResource(this.fs, new Path(str2), localResourceArr[i2]);
                    i2++;
                }
            }
        }
        this.taskManagerLocalResources = new HashMap(2);
        this.taskManagerLocalResources.put("flink.jar", localResource);
        this.taskManagerLocalResources.put("flink-conf.yaml", localResource2);
        if (!this.shipListString.isEmpty()) {
            Preconditions.checkNotNull(localResourceArr);
            for (int i3 = 0; i3 < split.length; i3++) {
                this.taskManagerLocalResources.put(new Path(split[i3]).getName(), localResourceArr[i3]);
            }
        }
        this.completedContainers = 0;
        StringBuffer stringBuffer = new StringBuffer();
        allocateOutstandingContainer(stringBuffer);
        LOG.info("Allocated all initial containers");
        while (this.completedContainers < this.taskManagerCount) {
            for (ContainerStatus containerStatus : this.rmClient.allocate(this.completedContainers / this.taskManagerCount).getCompletedContainersStatuses()) {
                this.completedContainers++;
                LOG.info("Completed container " + containerStatus.getContainerId() + ". Total Completed:" + this.completedContainers);
                LOG.info("Diagnostics " + containerStatus.getDiagnostics());
                logDeadContainer(containerStatus, stringBuffer);
            }
            Thread.sleep(5000L);
        }
        if (this.isClosed) {
            return;
        }
        String str3 = "Application Master shut down after all containers finished\n" + stringBuffer.toString();
        LOG.info("Diagnostics message: " + str3);
        this.rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, str3, "");
        close();
        this.amRpcServer.stop();
        LOG.info("Application Master shutdown completed.");
    }

    private void allocateOutstandingContainer(StringBuffer stringBuffer) throws Exception {
        String string = GlobalConfiguration.getString("env.java.opts", "");
        int i = 0;
        while (i < this.taskManagerCount) {
            AllocateResponse allocate = this.rmClient.allocate(0.0f);
            for (Container container : allocate.getAllocatedContainers()) {
                LOG.info("Got new Container for TM " + container.getId() + " on host " + container.getNodeId().getHost());
                i++;
                ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
                String str = "$JAVA_HOME/bin/java -Xmx" + this.heapLimit + "m " + string;
                if (this.hasLogback || this.hasLog4j) {
                    str = str + " -Dlog.file=\"<LOG_DIR>/taskmanager.log\"";
                }
                if (this.hasLogback) {
                    str = str + " -Dlogback.configurationFile=file:logback.xml";
                }
                if (this.hasLog4j) {
                    str = str + " -Dlog4j.configuration=file:log4j.properties";
                }
                String str2 = str + " " + YarnTaskManagerRunner.class.getName() + " -configDir .  1><LOG_DIR>/taskmanager-stdout.log 2><LOG_DIR>/taskmanager-stderr.log";
                containerLaunchContext.setCommands(Collections.singletonList(str2));
                LOG.info("Starting TM with command=" + str2);
                containerLaunchContext.setLocalResources(this.taskManagerLocalResources);
                HashMap hashMap = new HashMap();
                Utils.setupEnv(this.conf, hashMap);
                hashMap.put(Client.ENV_CLIENT_USERNAME, this.yarnClientUsername);
                containerLaunchContext.setEnvironment(hashMap);
                try {
                    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
                    DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                    credentials.writeTokenStorageToStream(dataOutputBuffer);
                    containerLaunchContext.setTokens(ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                } catch (IOException e) {
                    LOG.warn("Getting current user info failed when trying to launch the container", e);
                }
                LOG.info("Launching container " + i);
                this.nmClient.startContainer(container, containerLaunchContext);
            }
            for (ContainerStatus containerStatus : allocate.getCompletedContainersStatuses()) {
                this.completedContainers++;
                LOG.info("Completed container (while allocating) " + containerStatus.getContainerId() + ". Total Completed:" + this.completedContainers);
                LOG.info("Diagnostics " + containerStatus.getDiagnostics());
                logDeadContainer(containerStatus, stringBuffer);
            }
            Thread.sleep(100L);
        }
    }

    private void logDeadContainer(ContainerStatus containerStatus, StringBuffer stringBuffer) {
        String str = "Diagnostics for containerId=" + containerStatus.getContainerId() + " in state=" + containerStatus.getState() + "\n" + containerStatus.getDiagnostics();
        this.messages.add(new YARNClientMasterProtocol.Message(str));
        stringBuffer.append("\n\n");
        stringBuffer.append(str);
    }

    @Override // org.apache.flink.yarn.rpc.YARNClientMasterProtocol
    public ApplicationMasterStatus getAppplicationMasterStatus() {
        if (this.amStatus == null) {
            this.amStatus = new ApplicationMasterStatus();
        }
        if (this.jobManager == null) {
            this.amStatus.setNumTaskManagers(0);
            this.amStatus.setNumSlots(0);
        } else {
            this.amStatus.setNumTaskManagers(this.jobManager.getNumberOfTaskManagers());
            this.amStatus.setNumSlots(this.jobManager.getTotalNumberOfRegisteredSlots());
        }
        this.amStatus.setMessageCount(this.messages.size());
        this.amStatus.setFailed(this.isFailed);
        return this.amStatus;
    }

    @Override // org.apache.flink.yarn.rpc.YARNClientMasterProtocol
    public void shutdownAM() throws Exception {
        LOG.info("Client requested shutdown of AM");
        FinalApplicationStatus finalApplicationStatus = FinalApplicationStatus.SUCCEEDED;
        String str = "";
        if (this.isFailed.booleanValue()) {
            finalApplicationStatus = FinalApplicationStatus.FAILED;
            str = "Application Master failed";
            this.isFailed = false;
            this.isFailed.notifyAll();
        }
        this.rmClient.unregisterApplicationMaster(finalApplicationStatus, str, "");
        close();
    }

    private void close() throws Exception {
        if (this.isClosed) {
            LOG.warn("The AM has already been closed before");
        } else {
            this.jobManager.shutdown();
            this.nmClient.close();
            this.rmClient.close();
            if (this.isFailed.booleanValue()) {
                LOG.warn("Can not close AM RPC connection since the AM is in failed state");
            } else {
                this.amRpcServer.stop();
            }
        }
        this.isClosed = true;
        System.exit(0);
    }

    @Override // org.apache.flink.yarn.rpc.YARNClientMasterProtocol
    public List<YARNClientMasterProtocol.Message> getMessages() {
        return this.messages;
    }

    public void addMessage(YARNClientMasterProtocol.Message message) {
        this.messages.add(message);
    }

    @Override // org.apache.flink.yarn.rpc.YARNClientMasterProtocol
    public void addTaskManagers(int i) {
        throw new RuntimeException("Implement me");
    }

    protected void keepRPCAlive() {
        synchronized (this.isFailed) {
            while (this.isFailed.booleanValue()) {
                try {
                    this.isFailed.wait(100L);
                } catch (InterruptedException e) {
                    LOG.warn("Error while waiting until end of failed mode of AM", e);
                }
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        String str = System.getenv(Client.ENV_CLIENT_USERNAME);
        LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() + "' setting user to execute Flink ApplicationMaster/JobManager to '" + str + "'");
        UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str);
        Iterator it = UserGroupInformation.getCurrentUser().getTokens().iterator();
        while (it.hasNext()) {
            createRemoteUser.addToken((Token) it.next());
        }
        createRemoteUser.doAs(new PrivilegedAction<Object>() { // from class: org.apache.flink.yarn.appMaster.ApplicationMaster.1
            @Override // java.security.PrivilegedAction
            public Object run() {
                AMRMClient aMRMClient = null;
                ApplicationMaster applicationMaster = null;
                try {
                    Configuration initializeYarnConfiguration = Utils.initializeYarnConfiguration();
                    aMRMClient = AMRMClient.createAMRMClient();
                    aMRMClient.init(initializeYarnConfiguration);
                    aMRMClient.start();
                    applicationMaster = new ApplicationMaster(initializeYarnConfiguration);
                    applicationMaster.generateConfigurationFile();
                    applicationMaster.startJobManager();
                    applicationMaster.setRMClient(aMRMClient);
                    applicationMaster.run();
                    return null;
                } catch (Throwable th) {
                    ApplicationMaster.LOG.error("Error while running the application master", th);
                    if (aMRMClient != null && applicationMaster == null) {
                        try {
                            aMRMClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "Flink YARN Application master stopped unexpectedly with an exception.\n" + StringUtils.stringifyException(th), "");
                        } catch (Exception e) {
                            ApplicationMaster.LOG.error("Unable to fail the application master", e);
                        }
                        ApplicationMaster.LOG.info("AM unregistered from RM");
                        return null;
                    }
                    if (aMRMClient == null) {
                        ApplicationMaster.LOG.error("Unable to unregister AM since the RM client is not available");
                    }
                    if (applicationMaster == null) {
                        return null;
                    }
                    ApplicationMaster.LOG.info("Writing error into internal message system");
                    applicationMaster.setFailed(true);
                    applicationMaster.addMessage(new YARNClientMasterProtocol.Message("The application master failed with an exception:\n" + StringUtils.stringifyException(th)));
                    applicationMaster.keepRPCAlive();
                    return null;
                }
            }
        });
    }
}
