package org.apache.flink.yarn;

import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.util.Records;

/* loaded from: input_file:org/apache/flink/yarn/ApplicationMaster.class */
public class ApplicationMaster {
    private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);

    /* JADX INFO: Access modifiers changed from: private */
    public void run() throws Exception {
        Configuration initializeYarnConfiguration = Utils.initializeYarnConfiguration();
        FileSystem fileSystem = FileSystem.get(initializeYarnConfiguration);
        Map<String, String> map = System.getenv();
        String str = map.get(ApplicationConstants.Environment.PWD.key());
        String str2 = map.get(ApplicationConstants.Environment.LOG_DIRS.key());
        String str3 = map.get(ApplicationConstants.Environment.NM_HOST.key());
        String str4 = map.get(Client.ENV_APP_ID);
        String str5 = map.get(Client.ENV_CLIENT_HOME_DIR);
        String str6 = map.get(ApplicationConstants.Environment.NM_HOST.key());
        String str7 = map.get(Client.FLINK_JAR_PATH);
        String str8 = map.get(Client.ENV_CLIENT_SHIP_FILES);
        String str9 = map.get(Client.ENV_CLIENT_USERNAME);
        int intValue = Integer.valueOf(map.get(Client.ENV_TM_COUNT)).intValue();
        int intValue2 = Integer.valueOf(map.get(Client.ENV_TM_MEMORY)).intValue();
        int intValue3 = Integer.valueOf(map.get(Client.ENV_TM_CORES)).intValue();
        int calculateHeapSize = Utils.calculateHeapSize(intValue2);
        if (str == null) {
            throw new RuntimeException("Current directory unknown");
        }
        if (str3 == null) {
            throw new RuntimeException("Own hostname (" + ApplicationConstants.Environment.NM_HOST + ") not set.");
        }
        LOG.info("Working directory " + str);
        Utils.getFlinkConfiguration(str);
        String str10 = str + "/resources/web-docs-infoserver";
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(str + "/flink-conf.yaml")));
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(str + "/flink-conf-modified.yaml"));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                break;
            }
            if (readLine.contains("jobmanager.rpc.address")) {
                bufferedWriter.append((CharSequence) ("jobmanager.rpc.address: " + str3 + "\n"));
            } else if (readLine.contains("jobmanager.web.rootpath")) {
                bufferedWriter.append((CharSequence) "jobmanager.web.rootpath: \n");
            } else {
                bufferedWriter.append((CharSequence) (readLine + "\n"));
            }
        }
        bufferedWriter.append((CharSequence) ("jobmanager.rpc.address: " + str3 + "\n"));
        bufferedWriter.append((CharSequence) ("jobmanager.web.rootpath: " + str10 + "\n"));
        bufferedWriter.append((CharSequence) ("jobmanager.web.logpath: " + str2 + "\n"));
        bufferedWriter.close();
        bufferedReader.close();
        if (!new File(str + "/flink-conf-modified.yaml").exists()) {
            LOG.warn("modified yaml does not exist!");
        }
        Utils.copyJarContents("resources/web-docs-infoserver", ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
        JobManager initialize = JobManager.initialize(new String[]{"-executionMode", "cluster", "-configDir", str + "/flink-conf-modified.yaml"});
        initialize.startInfoServer();
        AMRMClient createAMRMClient = AMRMClient.createAMRMClient();
        createAMRMClient.init(initializeYarnConfiguration);
        createAMRMClient.start();
        NMClient createNMClient = NMClient.createNMClient();
        createNMClient.init(initializeYarnConfiguration);
        createNMClient.start();
        LOG.info("registering ApplicationMaster");
        createAMRMClient.registerApplicationMaster(str6, 0, "http://" + str6 + ":" + GlobalConfiguration.getString("jobmanager.web.port", "undefined"));
        Priority priority = (Priority) Records.newRecord(Priority.class);
        priority.setPriority(0);
        Resource resource = (Resource) Records.newRecord(Resource.class);
        resource.setMemory(intValue2);
        resource.setVirtualCores(intValue3);
        for (int i = 0; i < intValue; i++) {
            AMRMClient.ContainerRequest containerRequest = new AMRMClient.ContainerRequest(resource, (String[]) null, (String[]) null, priority);
            LOG.info("Requesting TaskManager container " + i);
            createAMRMClient.addContainerRequest(containerRequest);
        }
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        LocalResource localResource2 = (LocalResource) Records.newRecord(LocalResource.class);
        Utils.registerLocalResource(fileSystem, new Path(str7), localResource);
        Utils.setupLocalResource(initializeYarnConfiguration, fileSystem, str4, new Path("file://" + str + "/flink-conf-modified.yaml"), localResource2, new Path(str5));
        LOG.info("Prepared localresource for modified yaml: " + localResource2);
        boolean exists = new File(str + "/log4j.properties").exists();
        LocalResource[] localResourceArr = null;
        String[] split = str8.split(",");
        if (!str8.isEmpty()) {
            localResourceArr = new LocalResource[split.length];
            int i2 = 0;
            for (String str11 : split) {
                if (str11 != null && !str11.isEmpty()) {
                    localResourceArr[i2] = (LocalResource) Records.newRecord(LocalResource.class);
                    Utils.registerLocalResource(fileSystem, new Path(str11), localResourceArr[i2]);
                    i2++;
                }
            }
        }
        String string = GlobalConfiguration.getString("env.java.opts", "");
        int i3 = 0;
        int i4 = 0;
        while (i3 < intValue) {
            AllocateResponse allocate = createAMRMClient.allocate(0.0f);
            for (Container container : allocate.getAllocatedContainers()) {
                LOG.info("Got new Container for TM " + container.getId() + " on host " + container.getNodeId().getHost());
                i3++;
                ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
                String str12 = "$JAVA_HOME/bin/java -Xmx" + calculateHeapSize + "m " + string;
                if (exists) {
                    str12 = str12 + " -Dlog.file=\"<LOG_DIR>/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
                }
                String str13 = str12 + " org.apache.flink.yarn.YarnTaskManagerRunner -configDir .  1><LOG_DIR>/taskmanager-stdout.log 2><LOG_DIR>/taskmanager-stderr.log";
                containerLaunchContext.setCommands(Collections.singletonList(str13));
                LOG.info("Starting TM with command=" + str13);
                HashMap hashMap = new HashMap(2);
                hashMap.put("flink.jar", localResource);
                hashMap.put("flink-conf.yaml", localResource2);
                if (!str8.isEmpty()) {
                    Preconditions.checkNotNull(localResourceArr);
                    for (int i5 = 0; i5 < split.length; i5++) {
                        hashMap.put(new Path(split[i5]).getName(), localResourceArr[i5]);
                    }
                }
                containerLaunchContext.setLocalResources(hashMap);
                HashMap hashMap2 = new HashMap();
                Utils.setupEnv(initializeYarnConfiguration, hashMap2);
                hashMap2.put(Client.ENV_CLIENT_USERNAME, str9);
                containerLaunchContext.setEnvironment(hashMap2);
                try {
                    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
                    DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                    credentials.writeTokenStorageToStream(dataOutputBuffer);
                    containerLaunchContext.setTokens(ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                } catch (IOException e) {
                    LOG.warn("Getting current user info failed when trying to launch the container" + e.getMessage());
                }
                LOG.info("Launching container " + i3);
                createNMClient.startContainer(container, containerLaunchContext);
            }
            for (ContainerStatus containerStatus : allocate.getCompletedContainersStatuses()) {
                i4++;
                LOG.info("Completed container (while allocating) " + containerStatus.getContainerId() + ". Total Completed:" + i4);
                LOG.info("Diagnostics " + containerStatus.getDiagnostics());
            }
            Thread.sleep(100L);
        }
        while (i4 < intValue) {
            for (ContainerStatus containerStatus2 : createAMRMClient.allocate(i4 / intValue).getCompletedContainersStatuses()) {
                i4++;
                LOG.info("Completed container " + containerStatus2.getContainerId() + ". Total Completed:" + i4);
                LOG.info("Diagnostics " + containerStatus2.getDiagnostics());
            }
            Thread.sleep(5000L);
        }
        LOG.info("Shutting down JobManager");
        initialize.shutdown();
        createAMRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
    }

    public static void main(String[] strArr) throws Exception {
        String str = System.getenv(Client.ENV_CLIENT_USERNAME);
        LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() + "' setting user to execute Flink ApplicationMaster/JobManager to '" + str + "'");
        UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str);
        Iterator it = UserGroupInformation.getCurrentUser().getTokens().iterator();
        while (it.hasNext()) {
            createRemoteUser.addToken((Token) it.next());
        }
        createRemoteUser.doAs(new PrivilegedAction<Object>() { // from class: org.apache.flink.yarn.ApplicationMaster.1
            @Override // java.security.PrivilegedAction
            public Object run() {
                try {
                    new ApplicationMaster().run();
                    return null;
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        });
    }
}
