package co.cask.cdap.app.runtime.spark;

import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.program.Programs;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.ProgramClassLoader;
import co.cask.cdap.data.runtime.DataFabricModules;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.stream.StreamAdminModules;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.data.view.ViewAdminModules;
import co.cask.cdap.data2.audit.AuditModule;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.metadata.writer.ProgramContextAware;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.explore.guice.ExploreClientModule;
import co.cask.cdap.internal.app.program.ForwardingProgram;
import co.cask.cdap.internal.app.runtime.plugin.PluginInstantiator;
import co.cask.cdap.internal.app.runtime.workflow.NameMappedDatasetFramework;
import co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramInfo;
import co.cask.cdap.logging.appender.LogAppenderInitializer;
import co.cask.cdap.logging.guice.LoggingModules;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import co.cask.cdap.notifications.feeds.guice.NotificationFeedServiceRuntimeModule;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.Services;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkRuntimeContextProvider.class */
public final class SparkRuntimeContextProvider {
    static final String CCONF_FILE_NAME = "cConf.xml";
    static final String HCONF_FILE_NAME = "hConf.xml";
    static final String PROGRAM_JAR_EXPANDED_NAME = "program.expanded.jar";
    static final String PROGRAM_JAR_NAME = "program.jar";
    private static volatile SparkRuntimeContext sparkRuntimeContext;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkRuntimeContextProvider$LogAppenderService.class */
    public static final class LogAppenderService extends AbstractService {
        private final LogAppenderInitializer initializer;

        private LogAppenderService(LogAppenderInitializer logAppenderInitializer) {
            this.initializer = logAppenderInitializer;
        }

        protected void doStart() {
            try {
                this.initializer.initialize();
                notifyStarted();
            } catch (Throwable th) {
                notifyFailed(th);
            }
        }

        protected void doStop() {
            try {
                this.initializer.close();
                notifyStopped();
            } catch (Throwable th) {
                notifyFailed(th);
            }
        }
    }

    public static SparkRuntimeContext get() {
        if (sparkRuntimeContext != null) {
            return sparkRuntimeContext;
        }
        SparkClassLoader sparkClassLoader = (SparkClassLoader) ClassLoaders.find(Thread.currentThread().getContextClassLoader(), SparkClassLoader.class);
        return sparkClassLoader != null ? sparkClassLoader.getRuntimeContext() : createIfNotExists();
    }

    private static synchronized SparkRuntimeContext createIfNotExists() {
        if (sparkRuntimeContext != null) {
            return sparkRuntimeContext;
        }
        try {
            CConfiguration createCConf = createCConf();
            Configuration createHConf = createHConf();
            SparkRuntimeContextConfig sparkRuntimeContextConfig = new SparkRuntimeContextConfig(createHConf);
            Preconditions.checkState(!sparkRuntimeContextConfig.isLocal() && Boolean.parseBoolean(System.getenv("SPARK_YARN_MODE")), "SparkContextProvider.getSparkContext should only be called in Spark executor process.");
            Program createProgram = createProgram(createCConf, sparkRuntimeContextConfig);
            Injector createInjector = createInjector(createCConf, createHConf);
            final LogAppenderService logAppenderService = new LogAppenderService((LogAppenderInitializer) createInjector.getInstance(LogAppenderInitializer.class));
            final Service service = (ZKClientService) createInjector.getInstance(ZKClientService.class);
            final Service service2 = (KafkaClientService) createInjector.getInstance(KafkaClientService.class);
            final Service service3 = (MetricsCollectionService) createInjector.getInstance(MetricsCollectionService.class);
            final Service service4 = (StreamCoordinatorClient) createInjector.getInstance(StreamCoordinatorClient.class);
            Services.chainStart(logAppenderService, new Service[]{service, service2, service3, service4});
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeContextProvider.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    System.out.println("Shutting SparkClassLoader services");
                    try {
                        System.out.println("SparkClassLoader services shutdown completed: " + ((List) Services.chainStop(logAppenderService, new Service[]{service4, service3, service2, service}).get(5L, TimeUnit.SECONDS)));
                    } catch (Exception e) {
                        System.err.println("Exception when shutting down services");
                        e.printStackTrace(System.err);
                    }
                }
            });
            NameMappedDatasetFramework nameMappedDatasetFramework = (DatasetFramework) createInjector.getInstance(DatasetFramework.class);
            WorkflowProgramInfo workflowProgramInfo = sparkRuntimeContextConfig.getWorkflowProgramInfo();
            NameMappedDatasetFramework createFromWorkflowProgramInfo = workflowProgramInfo == null ? nameMappedDatasetFramework : NameMappedDatasetFramework.createFromWorkflowProgramInfo(nameMappedDatasetFramework, workflowProgramInfo, sparkRuntimeContextConfig.getApplicationSpecification());
            if (createFromWorkflowProgramInfo instanceof ProgramContextAware) {
                ((ProgramContextAware) createFromWorkflowProgramInfo).initContext(new Id.Run(sparkRuntimeContextConfig.getProgramId().toId(), sparkRuntimeContextConfig.getRunId().getId()));
            }
            sparkRuntimeContext = new SparkRuntimeContext(sparkRuntimeContextConfig.getConfiguration(), createProgram, sparkRuntimeContextConfig.getRunId(), sparkRuntimeContextConfig.getArguments(), (TransactionSystemClient) createInjector.getInstance(TransactionSystemClient.class), createFromWorkflowProgramInfo, (DiscoveryServiceClient) createInjector.getInstance(DiscoveryServiceClient.class), service3, (StreamAdmin) createInjector.getInstance(StreamAdmin.class), sparkRuntimeContextConfig.getWorkflowProgramInfo(), createPluginInstantiator(createCConf, createHConf, createProgram.getClassLoader()));
            return sparkRuntimeContext;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private static CConfiguration createCConf() throws MalformedURLException {
        return CConfiguration.create(new File(CCONF_FILE_NAME), new File[0]);
    }

    private static Configuration createHConf() throws MalformedURLException {
        Configuration configuration = new Configuration();
        configuration.clear();
        configuration.addResource(new File(HCONF_FILE_NAME).toURI().toURL());
        return configuration;
    }

    private static Program createProgram(CConfiguration cConfiguration, SparkRuntimeContextConfig sparkRuntimeContextConfig) throws IOException {
        File file = new File(PROGRAM_JAR_NAME);
        ProgramClassLoader createProgramClassLoader = SparkRuntimeUtils.createProgramClassLoader(cConfiguration, new File(PROGRAM_JAR_EXPANDED_NAME), SparkRuntimeContextProvider.class.getClassLoader());
        final Id.Program id = sparkRuntimeContextConfig.getProgramId().toId();
        return new ForwardingProgram(Programs.create(Locations.toLocation(file), createProgramClassLoader)) { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeContextProvider.2
            public Id.Program getId() {
                return id;
            }

            public String getName() {
                return getId().getId();
            }

            public ProgramType getType() {
                return ProgramType.SPARK;
            }
        };
    }

    @Nullable
    private static PluginInstantiator createPluginInstantiator(CConfiguration cConfiguration, Configuration configuration, ClassLoader classLoader) {
        String str = configuration.get("cdap.program.plugin.archive");
        if (str == null) {
            return null;
        }
        return new PluginInstantiator(cConfiguration, classLoader, new File(str));
    }

    private static Injector createInjector(CConfiguration cConfiguration, Configuration configuration) {
        return Guice.createInjector(new Module[]{new ConfigModule(cConfiguration, configuration), new IOModule(), new ZKClientModule(), new KafkaClientModule(), new LocationRuntimeModule().getDistributedModules(), new DiscoveryRuntimeModule().getDistributedModules(), new DataFabricModules().getDistributedModules(), new DataSetsModules().getDistributedModules(), new MetricsClientRuntimeModule().getDistributedModules(), new LoggingModules().getDistributedModules(), new ExploreClientModule(), new ViewAdminModules().getDistributedModules(), new StreamAdminModules().getDistributedModules(), new NotificationFeedServiceRuntimeModule().getDistributedModules(), new AuditModule().getDistributedModules()});
    }

    private SparkRuntimeContextProvider() {
    }
}
