package co.cask.cdap.app.runtime.spark.distributed;

import co.cask.cdap.app.runtime.spark.SparkCredentialsUpdater;
import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
import co.cask.cdap.app.runtime.spark.SparkRuntimeEnv;
import co.cask.cdap.common.BadRequestException;
import co.cask.cdap.internal.app.runtime.workflow.BasicWorkflowToken;
import co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramInfo;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkConf;
import org.apache.twill.filesystem.FileContextLocationFactory;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/distributed/SparkDriverService.class */
public class SparkDriverService extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(SparkDriverService.class);
    private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
    private static final int MAX_HEARTBEAT_FAILURES = 60;
    private final SparkExecutionClient client;

    @Nullable
    private final SparkCredentialsUpdater credentialsUpdater;

    @Nullable
    private final BasicWorkflowToken workflowToken;
    private Thread runThread;

    public SparkDriverService(URI uri, SparkRuntimeContext sparkRuntimeContext) {
        this.client = new SparkExecutionClient(uri, sparkRuntimeContext.getProgramRunId());
        this.credentialsUpdater = createCredentialsUpdater(sparkRuntimeContext.getConfiguration(), this.client);
        WorkflowProgramInfo workflowInfo = sparkRuntimeContext.getWorkflowInfo();
        this.workflowToken = workflowInfo == null ? null : workflowInfo.getWorkflowToken();
    }

    protected void startUp() throws Exception {
        this.runThread = Thread.currentThread();
        heartbeat(this.client, this.workflowToken);
        if (this.credentialsUpdater != null) {
            this.credentialsUpdater.startAndWait();
        }
        LOG.info("SparkDriverService started.");
    }

    protected void run() throws Exception {
        int i = 0;
        while (isRunning()) {
            try {
                long currentTimeMillis = System.currentTimeMillis() + HEARTBEAT_INTERVAL_MILLIS;
                heartbeat(this.client, this.workflowToken);
                i = 0;
                long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                if (isRunning() && currentTimeMillis2 > 0) {
                    TimeUnit.MILLISECONDS.sleep(currentTimeMillis2);
                }
            } catch (BadRequestException e) {
                LOG.error("Invalid spark program heartbeat. Terminating the execution.", e);
                throw e;
            } catch (InterruptedException e2) {
            } catch (Throwable th) {
                int i2 = i;
                i++;
                if (i2 >= MAX_HEARTBEAT_FAILURES) {
                    LOG.error("Failed to make heartbeat for {} times. Terminating the execution", Integer.valueOf(i), th);
                    throw th;
                }
                LOG.warn("Failed to make heartbeat for {} times", Integer.valueOf(i), th);
            }
        }
    }

    protected void shutDown() throws Exception {
        Thread.interrupted();
        try {
            if (this.credentialsUpdater != null) {
                this.credentialsUpdater.stopAndWait();
            }
            LOG.info("SparkDriverService stopped.");
        } finally {
            this.client.completed(this.workflowToken);
        }
    }

    protected void triggerShutdown() {
        if (this.runThread == null || this.runThread == Thread.currentThread()) {
            return;
        }
        this.runThread.interrupt();
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.app.runtime.spark.distributed.SparkDriverService.1
            @Override // java.util.concurrent.Executor
            public void execute(Runnable runnable) {
                Thread thread = new Thread(runnable, "SparkDriverService");
                thread.setDaemon(true);
                thread.start();
            }
        };
    }

    @Nullable
    private SparkCredentialsUpdater createCredentialsUpdater(Configuration configuration, SparkExecutionClient sparkExecutionClient) {
        try {
            SparkConf sparkConf = new SparkConf();
            long j = sparkConf.getLong("spark.yarn.token.renewal.interval", -1L);
            if (j <= 0) {
                return null;
            }
            URI create = URI.create(System.getenv("SPARK_YARN_STAGING_DIR"));
            FileContextLocationFactory fileContextLocationFactory = new FileContextLocationFactory(configuration);
            Location create2 = create.isAbsolute() ? fileContextLocationFactory.create(create.getPath()) : fileContextLocationFactory.getHomeLocation().append(create.getPath());
            LOG.info("Credentials DIR: {}", create2);
            int i = sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5);
            int i2 = sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5);
            Location append = create2.append("credentials-" + UUID.randomUUID());
            SparkRuntimeEnv.setProperty("spark.yarn.credentials.file", append.toURI().toString());
            return new SparkCredentialsUpdater(createCredentialsSupplier(sparkExecutionClient, create2), create2, append.getName(), j, TimeUnit.DAYS.toMillis(i), i2);
        } catch (IOException e) {
            LOG.warn("Failed to create credentials updater. Credentials update disabled", e);
            return null;
        }
    }

    private Supplier<Credentials> createCredentialsSupplier(final SparkExecutionClient sparkExecutionClient, final Location location) {
        return new Supplier<Credentials>() { // from class: co.cask.cdap.app.runtime.spark.distributed.SparkDriverService.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Credentials m74get() {
                try {
                    Location append = location.append("fetch-credentials-" + UUID.randomUUID() + ".tmp");
                    try {
                        sparkExecutionClient.writeCredentials(append);
                        Credentials credentials = new Credentials();
                        DataInputStream dataInputStream = new DataInputStream(append.getInputStream());
                        Throwable th = null;
                        try {
                            credentials.readTokenStorageStream(dataInputStream);
                            UserGroupInformation.getCurrentUser().addCredentials(credentials);
                            SparkDriverService.LOG.debug("Credentials updated: {}", credentials.getAllTokens());
                            if (dataInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        dataInputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    dataInputStream.close();
                                }
                            }
                            return credentials;
                        } catch (Throwable th3) {
                            if (dataInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        dataInputStream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    dataInputStream.close();
                                }
                            }
                            throw th3;
                        }
                    } finally {
                        if (!append.delete()) {
                            SparkDriverService.LOG.warn("Failed to delete temporary location {}", append);
                        }
                    }
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        };
    }

    private void heartbeat(SparkExecutionClient sparkExecutionClient, @Nullable BasicWorkflowToken basicWorkflowToken) throws Exception {
        LOG.trace("Sending heartbeat with workflow token {}", basicWorkflowToken == null ? null : basicWorkflowToken.getAllFromCurrentNode());
        SparkCommand heartbeat = sparkExecutionClient.heartbeat(basicWorkflowToken);
        if (heartbeat == null) {
            return;
        }
        if (!SparkCommand.STOP.equals(heartbeat)) {
            LOG.warn("Ignoring unsupported command {}", heartbeat);
        } else {
            LOG.info("Stop command received from client. Stopping spark program.");
            stop();
        }
    }
}
