package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.ProgramClassLoader;
import co.cask.cdap.data.runtime.DataFabricModules;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.stream.StreamAdminModules;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.logging.appender.LogAppenderInitializer;
import co.cask.cdap.logging.guice.LoggingModules;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import co.cask.cdap.notifications.feeds.guice.NotificationFeedServiceRuntimeModule;
import co.cask.cdap.proto.ProgramType;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.File;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.Services;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkContextProvider.class */
public final class SparkContextProvider {
    public static final String CCONF_FILE_NAME = "cConf.xml";
    public static final String HCONF_FILE_NAME = "hConf.xml";
    public static final String PROGRAM_JAR_NAME = "program.jar";
    private static volatile ExecutionSparkContext sparkContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkContextProvider$LogAppenderService.class */
    public static final class LogAppenderService extends AbstractService {
        private final LogAppenderInitializer initializer;

        private LogAppenderService(LogAppenderInitializer logAppenderInitializer) {
            this.initializer = logAppenderInitializer;
        }

        protected void doStart() {
            try {
                this.initializer.initialize();
                notifyStarted();
            } catch (Throwable th) {
                notifyFailed(th);
            }
        }

        protected void doStop() {
            try {
                this.initializer.close();
                notifyStopped();
            } catch (Throwable th) {
                notifyFailed(th);
            }
        }
    }

    public static ExecutionSparkContext getSparkContext() {
        if (sparkContext != null) {
            return sparkContext;
        }
        SparkClassLoader sparkClassLoader = (SparkClassLoader) ClassLoaders.find(Thread.currentThread().getContextClassLoader(), SparkClassLoader.class);
        return sparkClassLoader != null ? sparkClassLoader.getContext() : createIfNotExists();
    }

    private static synchronized ExecutionSparkContext createIfNotExists() {
        if (sparkContext != null) {
            return sparkContext;
        }
        try {
            CConfiguration createCConf = createCConf();
            Configuration createHConf = createHConf();
            SparkContextConfig sparkContextConfig = new SparkContextConfig(createHConf);
            Preconditions.checkState(!sparkContextConfig.isLocal() && Boolean.parseBoolean(System.getenv("SPARK_YARN_MODE")), "SparkContextProvider.getSparkContext should only be called in Spark executor process.");
            ProgramClassLoader create = ProgramClassLoader.create(new File(PROGRAM_JAR_NAME), SparkClassLoader.class.getClassLoader(), ProgramType.SPARK);
            Injector createInjector = createInjector(createCConf, createHConf);
            final LogAppenderService logAppenderService = new LogAppenderService((LogAppenderInitializer) createInjector.getInstance(LogAppenderInitializer.class));
            final Service service = (ZKClientService) createInjector.getInstance(ZKClientService.class);
            final Service service2 = (KafkaClientService) createInjector.getInstance(KafkaClientService.class);
            final Service service3 = (MetricsCollectionService) createInjector.getInstance(MetricsCollectionService.class);
            final Service service4 = (StreamCoordinatorClient) createInjector.getInstance(StreamCoordinatorClient.class);
            Services.chainStart(logAppenderService, new Service[]{service, service2, service3, service4});
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkContextProvider.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    System.out.println("Shutting SparkClassLoader services");
                    try {
                        System.out.println("SparkClassLoader services shutdown completed: " + ((List) Services.chainStop(logAppenderService, new Service[]{service4, service3, service2, service}).get(5L, TimeUnit.SECONDS)));
                    } catch (Exception e) {
                        System.err.println("Exception when shutting down services");
                        e.printStackTrace(System.err);
                    }
                }
            });
            sparkContext = new ExecutionSparkContext(sparkContextConfig.getSpecification(), sparkContextConfig.getProgramId(), sparkContextConfig.getRunId(), create, sparkContextConfig.getLogicalStartTime(), sparkContextConfig.getArguments(), sparkContextConfig.getTransaction(), (DatasetFramework) createInjector.getInstance(DatasetFramework.class), (DiscoveryServiceClient) createInjector.getInstance(DiscoveryServiceClient.class), service3, createHConf, (StreamAdmin) createInjector.getInstance(StreamAdmin.class), sparkContextConfig.getWorkflowToken());
            return sparkContext;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private static CConfiguration createCConf() throws MalformedURLException {
        CConfiguration create = CConfiguration.create();
        create.clear();
        create.addResource(new File(CCONF_FILE_NAME).toURI().toURL());
        return create;
    }

    private static Configuration createHConf() throws MalformedURLException {
        Configuration configuration = new Configuration();
        configuration.clear();
        configuration.addResource(new File(HCONF_FILE_NAME).toURI().toURL());
        return configuration;
    }

    private static Injector createInjector(CConfiguration cConfiguration, Configuration configuration) {
        return Guice.createInjector(new Module[]{new ConfigModule(cConfiguration, configuration), new IOModule(), new ZKClientModule(), new KafkaClientModule(), new LocationRuntimeModule().getDistributedModules(), new DiscoveryRuntimeModule().getDistributedModules(), new DataFabricModules().getDistributedModules(), new DataSetsModules().getDistributedModules(), new MetricsClientRuntimeModule().getDistributedModules(), new LoggingModules().getDistributedModules(), new StreamAdminModules().getDistributedModules(), new NotificationFeedServiceRuntimeModule().getDistributedModules()});
    }

    private SparkContextProvider() {
    }
}
