package org.apache.twill.yarn;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.time.StopWatch;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.internal.AbstractTwillController;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.appmaster.TrackerService;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.data.Stat;
import org.mortbay.jetty.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/twill/yarn/YarnTwillController.class */
public final class YarnTwillController extends AbstractTwillController implements TwillController {
    private static final Logger LOG = LoggerFactory.getLogger(YarnTwillController.class);
    private final String appName;
    private final Callable<ProcessController<YarnApplicationReport>> startUp;
    private ProcessController<YarnApplicationReport> processController;
    private ResourceReportClient resourcesClient;
    private Thread statusPollingThread;

    /* JADX INFO: Access modifiers changed from: package-private */
    public YarnTwillController(String str, RunId runId, ZKClient zKClient, Callable<ProcessController<YarnApplicationReport>> callable) {
        this(str, runId, zKClient, ImmutableList.of(), callable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public YarnTwillController(String str, RunId runId, ZKClient zKClient, Iterable<LogHandler> iterable, Callable<ProcessController<YarnApplicationReport>> callable) {
        super(runId, zKClient, iterable);
        this.appName = str;
        this.startUp = callable;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<Void> secureStoreUpdated() {
        return sendMessage(SystemMessages.SECURE_STORE_UPDATED, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.twill.internal.AbstractTwillController, org.apache.twill.internal.AbstractZKServiceController
    public void doStartUp() {
        super.doStartUp();
        try {
            this.processController = this.startUp.call();
            YarnApplicationReport report = this.processController.getReport();
            ApplicationId applicationId = report.getApplicationId();
            LOG.debug("Application {} with id {} submitted", this.appName, applicationId);
            YarnApplicationState yarnApplicationState = report.getYarnApplicationState();
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            stopWatch.split();
            long convert = TimeUnit.MILLISECONDS.convert(60L, TimeUnit.SECONDS);
            LOG.debug("Checking yarn application status for {} {}", this.appName, applicationId);
            while (!hasRun(yarnApplicationState) && stopWatch.getSplitTime() < convert) {
                report = this.processController.getReport();
                yarnApplicationState = report.getYarnApplicationState();
                LOG.debug("Yarn application status for {} {}: {}", new Object[]{this.appName, applicationId, yarnApplicationState});
                TimeUnit.SECONDS.sleep(1L);
                stopWatch.split();
            }
            LOG.info("Yarn application {} {} is in state {}", new Object[]{this.appName, applicationId, yarnApplicationState});
            if (yarnApplicationState != YarnApplicationState.RUNNING) {
                LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", new Object[]{this.appName, applicationId, 60});
                forceShutDown();
            } else {
                try {
                    this.resourcesClient = new ResourceReportClient(URI.create(String.format("http://%s:%d", report.getHost(), Integer.valueOf(report.getRpcPort()))).resolve(TrackerService.PATH).toURL());
                } catch (IOException e) {
                    this.resourcesClient = null;
                }
            }
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.twill.internal.AbstractTwillController, org.apache.twill.internal.AbstractZKServiceController
    public synchronized void doShutDown() {
        if (this.processController == null) {
            LOG.warn("No process controller for application that is not submitted.");
            return;
        }
        stopPollStatus();
        try {
            Uninterruptibles.getUninterruptibly(getStopMessageFuture(), 60L, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.error("Failed to wait for stop message being processed.", e);
            kill();
        }
        try {
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            stopWatch.split();
            long convert = TimeUnit.MILLISECONDS.convert(60L, TimeUnit.SECONDS);
            YarnApplicationReport report = this.processController.getReport();
            FinalApplicationStatus finalApplicationStatus = report.getFinalApplicationStatus();
            ApplicationId applicationId = report.getApplicationId();
            while (finalApplicationStatus == FinalApplicationStatus.UNDEFINED && stopWatch.getSplitTime() < convert) {
                LOG.debug("Yarn application final status for {} {}: {}", new Object[]{this.appName, applicationId, finalApplicationStatus});
                TimeUnit.SECONDS.sleep(1L);
                stopWatch.split();
                finalApplicationStatus = this.processController.getReport().getFinalApplicationStatus();
            }
            LOG.debug("Yarn application {} {} completed with status {}", new Object[]{this.appName, applicationId, finalApplicationStatus});
            if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
                kill();
            }
        } catch (Exception e2) {
            LOG.warn("Exception while waiting for application report: {}", e2.getMessage(), e2);
            kill();
        }
        super.doShutDown();
    }

    @Override // org.apache.twill.api.ServiceController
    public void kill() {
        if (this.processController == null) {
            LOG.warn("No process controller for application that is not submitted.");
            return;
        }
        LOG.info("Killing application {} {}", this.appName, this.processController.getReport().getApplicationId());
        this.processController.cancel();
    }

    @Override // org.apache.twill.internal.AbstractZKServiceController
    protected void instanceNodeUpdated(NodeData nodeData) {
    }

    @Override // org.apache.twill.internal.AbstractZKServiceController
    protected void instanceNodeFailed(Throwable th) {
        if (this.processController == null) {
            LOG.warn("No process controller for application that is not submitted.");
            return;
        }
        YarnApplicationReport report = this.processController.getReport();
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = this.appName;
        objArr[1] = report.getApplicationId();
        objArr[2] = th == null ? HttpStatus.Unknown : th.getMessage();
        logger.info("Failed to access application {} {} live node in ZK, resort to polling. Failure reason: {}", objArr);
        startPollStatus(report.getApplicationId());
    }

    private synchronized void startPollStatus(ApplicationId applicationId) {
        if (this.statusPollingThread == null) {
            this.statusPollingThread = new Thread(createStatusPollingRunnable(), String.format("%s-%s-yarn-poller", this.appName, applicationId));
            this.statusPollingThread.setDaemon(true);
            this.statusPollingThread.start();
        }
    }

    private synchronized void stopPollStatus() {
        if (this.statusPollingThread != null) {
            this.statusPollingThread.interrupt();
            this.statusPollingThread = null;
        }
    }

    private Runnable createStatusPollingRunnable() {
        return new Runnable() { // from class: org.apache.twill.yarn.YarnTwillController.1
            @Override // java.lang.Runnable
            public void run() {
                YarnApplicationReport yarnApplicationReport = (YarnApplicationReport) YarnTwillController.this.processController.getReport();
                ApplicationId applicationId = yarnApplicationReport.getApplicationId();
                boolean z = false;
                boolean z2 = false;
                try {
                    YarnTwillController.LOG.debug("Polling status from Yarn for {} {}.", YarnTwillController.this.appName, applicationId);
                    while (true) {
                        if (Thread.currentThread().isInterrupted()) {
                            break;
                        }
                        if (yarnApplicationReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
                            z = true;
                            break;
                        }
                        try {
                            try {
                            } catch (ExecutionException e) {
                                YarnTwillController.LOG.debug("Failed in exists call on ZK path {}.", YarnTwillController.this.getInstancePath(), e);
                            }
                        } catch (TimeoutException e2) {
                            YarnTwillController.LOG.debug("Timeout in exists call on ZK path {}.", YarnTwillController.this.getInstancePath(), e2);
                        }
                        if (((Stat) YarnTwillController.this.zkClient.exists(YarnTwillController.this.getInstancePath()).get(5L, TimeUnit.SECONDS)) != null) {
                            z2 = true;
                            break;
                        } else {
                            TimeUnit.SECONDS.sleep(1L);
                            yarnApplicationReport = (YarnApplicationReport) YarnTwillController.this.processController.getReport();
                        }
                    }
                } catch (InterruptedException e3) {
                    YarnTwillController.LOG.debug("Status polling thread interrupted for application {} {}", YarnTwillController.this.appName, applicationId);
                }
                YarnTwillController.LOG.debug("Stop polling status from Yarn for {} {}.", YarnTwillController.this.appName, applicationId);
                if (z) {
                    YarnTwillController.LOG.info("Yarn application {} {} completed. Shutting down controller.", YarnTwillController.this.appName, applicationId);
                    YarnTwillController.this.forceShutDown();
                } else if (z2) {
                    YarnTwillController.LOG.info("Rewatch instance node for {} {} at {}", new Object[]{YarnTwillController.this.appName, applicationId, YarnTwillController.this.getInstancePath()});
                    synchronized (YarnTwillController.this) {
                        YarnTwillController.this.statusPollingThread = null;
                        YarnTwillController.this.watchInstanceNode();
                    }
                }
            }
        };
    }

    private boolean hasRun(YarnApplicationState yarnApplicationState) {
        switch (yarnApplicationState) {
            case RUNNING:
            case FINISHED:
            case FAILED:
            case KILLED:
                return true;
            default:
                return false;
        }
    }

    @Override // org.apache.twill.api.TwillController
    public ResourceReport getResourceReport() {
        if (this.resourcesClient == null) {
            return null;
        }
        return this.resourcesClient.get();
    }
}
