package com.datatorrent.stram;

import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;

/* loaded from: input_file:com/datatorrent/stram/InlineAM.class */
public abstract class InlineAM {
    private static final Log LOG = LogFactory.getLog(InlineAM.class);
    private final YarnClientImpl rmClient;
    private String appName;
    private int amPriority;
    private String amQueue;

    public InlineAM(Configuration configuration) throws Exception {
        this.appName = "";
        this.amPriority = 0;
        this.amQueue = "";
        this.appName = "UnmanagedAM";
        this.amPriority = 0;
        this.amQueue = "default";
        YarnConfiguration yarnConfiguration = new YarnConfiguration(configuration);
        this.rmClient = new YarnClientImpl();
        this.rmClient.init(yarnConfiguration);
    }

    public abstract void runAM(ApplicationAttemptId applicationAttemptId) throws Exception;

    public boolean run() throws Exception {
        boolean z;
        LOG.info("Starting Client");
        this.rmClient.start();
        try {
            ApplicationId applicationId = this.rmClient.createApplication().getNewApplicationResponse().getApplicationId();
            LOG.info("Setting up application submission context for ASM");
            ApplicationSubmissionContext applicationSubmissionContext = (ApplicationSubmissionContext) Records.newRecord(ApplicationSubmissionContext.class);
            applicationSubmissionContext.setApplicationId(applicationId);
            applicationSubmissionContext.setApplicationName(this.appName);
            Priority priority = (Priority) Records.newRecord(Priority.class);
            priority.setPriority(this.amPriority);
            applicationSubmissionContext.setPriority(priority);
            applicationSubmissionContext.setQueue(this.amQueue);
            applicationSubmissionContext.setAMContainerSpec((ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class));
            applicationSubmissionContext.setUnmanagedAM(true);
            LOG.info("Setting unmanaged AM");
            LOG.info("Submitting application to ASM");
            this.rmClient.submitApplication(applicationSubmissionContext);
            ApplicationAttemptId currentApplicationAttemptId = monitorApplication(applicationId, EnumSet.of(YarnApplicationState.ACCEPTED)).getCurrentApplicationAttemptId();
            LOG.info("Launching application with id: " + currentApplicationAttemptId);
            runAM(currentApplicationAttemptId);
            ApplicationReport monitorApplication = monitorApplication(applicationId, EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED));
            YarnApplicationState yarnApplicationState = monitorApplication.getYarnApplicationState();
            FinalApplicationStatus finalApplicationStatus = monitorApplication.getFinalApplicationStatus();
            LOG.info("App ended with state: " + monitorApplication.getYarnApplicationState() + " and status: " + finalApplicationStatus);
            if (YarnApplicationState.FINISHED == yarnApplicationState && FinalApplicationStatus.SUCCEEDED == finalApplicationStatus) {
                LOG.info("Application has completed successfully.");
                z = true;
            } else {
                LOG.info("Application did finished unsuccessfully. YarnState=" + yarnApplicationState.toString() + ", FinalStatus=" + finalApplicationStatus.toString());
                z = false;
            }
            return z;
        } finally {
            this.rmClient.stop();
        }
    }

    private ApplicationReport monitorApplication(ApplicationId applicationId, Set<YarnApplicationState> set) throws YarnException, IOException {
        ApplicationReport applicationReport;
        do {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                LOG.debug("Thread sleep in monitoring loop interrupted");
            }
            applicationReport = this.rmClient.getApplicationReport(applicationId);
            LOG.info("Got application report from ASM for, appId=" + applicationId.getId() + ", appAttemptId=" + applicationReport.getCurrentApplicationAttemptId() + ", clientToken=" + applicationReport.getClientToAMToken() + ", appDiagnostics=" + applicationReport.getDiagnostics() + ", appMasterHost=" + applicationReport.getHost() + ", appQueue=" + applicationReport.getQueue() + ", appMasterRpcPort=" + applicationReport.getRpcPort() + ", appStartTime=" + applicationReport.getStartTime() + ", yarnAppState=" + applicationReport.getYarnApplicationState().toString() + ", distributedFinalState=" + applicationReport.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + applicationReport.getTrackingUrl() + ", appUser=" + applicationReport.getUser());
        } while (!set.contains(applicationReport.getYarnApplicationState()));
        return applicationReport;
    }
}
