package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.CombineClassLoader;
import co.cask.cdap.common.lang.jar.BundleJarUtil;
import co.cask.cdap.common.twill.AbortOnTimeoutEventHandler;
import co.cask.cdap.common.twill.HadoopClassExcluder;
import co.cask.cdap.common.utils.DirUtils;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.codec.ArgumentsCodec;
import co.cask.cdap.internal.app.runtime.codec.ProgramOptionsCodec;
import co.cask.cdap.security.TokenSecureStoreUpdater;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.EventHandler;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractDistributedProgramRunner.class */
public abstract class AbstractDistributedProgramRunner implements ProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDistributedProgramRunner.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Arguments.class, new ArgumentsCodec()).registerTypeAdapter(ProgramOptions.class, new ProgramOptionsCodec()).create();
    private final TwillRunner twillRunner;
    protected final YarnConfiguration hConf;
    protected final CConfiguration cConf;
    protected final EventHandler eventHandler;
    private final TokenSecureStoreUpdater secureStoreUpdater;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractDistributedProgramRunner$ApplicationLauncher.class */
    protected abstract class ApplicationLauncher {
        protected ApplicationLauncher() {
        }

        public TwillController launch(TwillApplication twillApplication) {
            return launch(twillApplication, Collections.emptyList(), Collections.emptyList());
        }

        public TwillController launch(TwillApplication twillApplication, String... strArr) {
            return launch(twillApplication, Arrays.asList(strArr), Collections.emptyList());
        }

        public abstract TwillController launch(TwillApplication twillApplication, Iterable<String> iterable, Iterable<? extends Class<?>> iterable2);
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractDistributedProgramRunner$ApplicationLogHandler.class */
    private static final class ApplicationLogHandler implements LogHandler {
        private final LogHandler delegate;
        private final LogEntry.Level logLevel;

        private ApplicationLogHandler(LogHandler logHandler, LogEntry.Level level) {
            this.delegate = logHandler;
            this.logLevel = level;
        }

        public void onLog(LogEntry logEntry) {
            if (logEntry.getLogLevel().ordinal() <= this.logLevel.ordinal()) {
                this.delegate.onLog(logEntry);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractDistributedProgramRunner(TwillRunner twillRunner, YarnConfiguration yarnConfiguration, CConfiguration cConfiguration, TokenSecureStoreUpdater tokenSecureStoreUpdater) {
        this.twillRunner = twillRunner;
        this.hConf = yarnConfiguration;
        this.cConf = cConfiguration;
        this.eventHandler = createEventHandler(cConfiguration);
        this.secureStoreUpdater = tokenSecureStoreUpdater;
    }

    protected EventHandler createEventHandler(CConfiguration cConfiguration) {
        return new AbortOnTimeoutEventHandler(cConfiguration.getLong("twill.no.container.timeout", Long.MAX_VALUE));
    }

    @Override // co.cask.cdap.app.runtime.ProgramRunner
    public final ProgramController run(final Program program, ProgramOptions programOptions) {
        final String option = programOptions.getArguments().getOption("apps.scheduler.queue");
        final File createTempDir = DirUtils.createTempDir(new File(this.cConf.get("local.data.dir"), this.cConf.get("app.temp.dir")).getAbsoluteFile());
        if (option != null) {
            try {
                if (!option.isEmpty()) {
                    this.hConf.set("mapreduce.job.queuename", option);
                    LOG.info("Setting scheduler queue to {}", option);
                }
            } catch (Exception e) {
                deleteDirectory(createTempDir);
                throw Throwables.propagate(e);
            }
        }
        HashMap hashMap = new HashMap();
        final ProgramOptions addArtifactPluginFiles = addArtifactPluginFiles(programOptions, hashMap, DirUtils.createTempDir(createTempDir));
        hashMap.put("hConf.xml", new LocalizeResource(saveHConf(this.hConf, File.createTempFile("hConf", ".xml", createTempDir))));
        hashMap.put("cConf.xml", new LocalizeResource(saveCConf(this.cConf, File.createTempFile("cConf", ".xml", createTempDir))));
        final URI logBackURI = getLogBackURI(program, createTempDir);
        final String json = GSON.toJson(addArtifactPluginFiles);
        return launch(program, addArtifactPluginFiles, hashMap, createTempDir, new ApplicationLauncher() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner.ApplicationLauncher
            public TwillController launch(TwillApplication twillApplication, Iterable<String> iterable, Iterable<? extends Class<?>> iterable2) {
                TwillPreparer prepare = AbstractDistributedProgramRunner.this.twillRunner.prepare(twillApplication);
                prepare.withEnv(Collections.singletonMap("SPARK_YARN_MODE", "true"));
                if (addArtifactPluginFiles.isDebug()) {
                    AbstractDistributedProgramRunner.LOG.info("Starting {} with debugging enabled, programOptions: {}, and logback: {}", new Object[]{program.getId(), json, logBackURI});
                    prepare.enableDebugging(new String[0]);
                }
                if (option != null && !option.isEmpty()) {
                    AbstractDistributedProgramRunner.LOG.info("Setting scheduler queue for app {} as {}", program.getId(), option);
                    prepare.setSchedulerQueue(option);
                }
                if (logBackURI != null) {
                    prepare.withResources(new URI[]{logBackURI});
                }
                String upperCase = AbstractDistributedProgramRunner.this.cConf.get("master.collect.app.containers.log.level").toUpperCase();
                if ("OFF".equals(upperCase)) {
                    prepare.addJVMOptions("-Dtwill.disable.kafka=true");
                } else {
                    LogEntry.Level level = LogEntry.Level.ERROR;
                    if ("ALL".equals(upperCase)) {
                        level = LogEntry.Level.TRACE;
                    } else {
                        try {
                            level = LogEntry.Level.valueOf(upperCase.toUpperCase());
                        } catch (Exception e2) {
                            AbstractDistributedProgramRunner.LOG.warn("Invalid application container log level {}. Defaulting to ERROR.", upperCase);
                        }
                    }
                    prepare.addLogHandler(new ApplicationLogHandler(new PrinterLogHandler(new PrintWriter(System.out)), level));
                }
                String str = AbstractDistributedProgramRunner.this.hConf.get("yarn.application.classpath", Joiner.on(",").join(YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH));
                if (User.isHBaseSecurityEnabled(AbstractDistributedProgramRunner.this.hConf) || UserGroupInformation.isSecurityEnabled()) {
                    prepare.addSecureStore(AbstractDistributedProgramRunner.this.secureStoreUpdater.update(null, null));
                }
                Iterable concat = Iterables.concat(Collections.singletonList(HBaseTableUtilFactory.getHBaseTableUtilClass()), iterable2);
                prepare.withDependencies(concat).withClassPaths(Iterables.concat(iterable, Splitter.on(',').trimResults().split(AbstractDistributedProgramRunner.this.hConf.get("yarn.application.classpath", "")))).withApplicationClassPaths(Splitter.on(",").trimResults().split(str)).withBundlerClassAcceptor(new HadoopClassExcluder() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner.1.1
                    public boolean accept(String str2, URL url, URL url2) {
                        return super.accept(str2, url, url2) && !str2.startsWith("org.apache.spark.");
                    }
                }).withApplicationArguments(new String[]{String.format("--%s", "jar"), program.getJarLocation().getName(), String.format("--%s", "popts"), json});
                ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(new CombineClassLoader(AbstractDistributedProgramRunner.this.getClass().getClassLoader(), Iterables.transform(concat, new Function<Class<?>, ClassLoader>() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner.1.2
                    public ClassLoader apply(Class<?> cls) {
                        return cls.getClassLoader();
                    }
                })));
                try {
                    TwillController start = prepare.start();
                    ClassLoaders.setContextClassLoader(contextClassLoader);
                    return AbstractDistributedProgramRunner.this.addCleanupListener(start, program, createTempDir);
                } catch (Throwable th) {
                    ClassLoaders.setContextClassLoader(contextClassLoader);
                    throw th;
                }
            }
        });
    }

    private ProgramOptions addArtifactPluginFiles(ProgramOptions programOptions, Map<String, LocalizeResource> map, File file) throws IOException {
        Arguments arguments = programOptions.getArguments();
        if (!arguments.hasOption(ProgramOptionConstants.PLUGIN_DIR)) {
            return programOptions;
        }
        File file2 = new File(arguments.getOption(ProgramOptionConstants.PLUGIN_DIR));
        File file3 = new File(file, "artifacts.jar");
        BundleJarUtil.createJar(file2, file3);
        map.put("artifacts", new LocalizeResource(file3, true));
        map.put("artifacts_archive.jar", new LocalizeResource(file3, false));
        HashMap newHashMap = Maps.newHashMap(arguments.asMap());
        newHashMap.put(ProgramOptionConstants.PLUGIN_DIR, "artifacts");
        newHashMap.put(ProgramOptionConstants.PLUGIN_ARCHIVE, "artifacts_archive.jar");
        return new SimpleProgramOptions(programOptions.getName(), new BasicArguments(newHashMap), programOptions.getUserArguments(), programOptions.isDebug());
    }

    @Nullable
    private URI getLogBackURI(Program program, File file) throws IOException, URISyntaxException {
        URL resource = program.getClassLoader().getResource("logback.xml");
        if (resource != null) {
            return resource.toURI();
        }
        URL resource2 = getClass().getClassLoader().getResource("logback-container.xml");
        if (resource2 == null) {
            return null;
        }
        File file2 = new File(file, "logback.xml");
        Files.copy(Resources.newInputStreamSupplier(resource2), file2);
        return file2.toURI();
    }

    protected abstract ProgramController launch(Program program, ProgramOptions programOptions, Map<String, LocalizeResource> map, File file, ApplicationLauncher applicationLauncher);

    private File saveHConf(Configuration configuration, File file) throws IOException {
        BufferedWriter newWriter = Files.newWriter(file, Charsets.UTF_8);
        Throwable th = null;
        try {
            try {
                configuration.writeXml(newWriter);
                if (newWriter != null) {
                    if (0 != 0) {
                        try {
                            newWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newWriter.close();
                    }
                }
                return file;
            } finally {
            }
        } catch (Throwable th3) {
            if (newWriter != null) {
                if (th != null) {
                    try {
                        newWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newWriter.close();
                }
            }
            throw th3;
        }
    }

    private File saveCConf(CConfiguration cConfiguration, File file) throws IOException {
        CConfiguration copy = CConfiguration.copy(cConfiguration);
        copy.unset("app.program.runtime.extensions.dir");
        BufferedWriter newWriter = Files.newWriter(file, Charsets.UTF_8);
        Throwable th = null;
        try {
            try {
                copy.writeXml(newWriter);
                if (newWriter != null) {
                    if (0 != 0) {
                        try {
                            newWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newWriter.close();
                    }
                }
                return file;
            } finally {
            }
        } catch (Throwable th3) {
            if (newWriter != null) {
                if (th != null) {
                    try {
                        newWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newWriter.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteDirectory(File file) {
        try {
            DirUtils.deleteDirectoryContents(file);
        } catch (IOException e) {
            LOG.warn("Failed to delete directory {}", file, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TwillController addCleanupListener(TwillController twillController, final Program program, final File file) {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Runnable runnable = new Runnable() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner.2
            @Override // java.lang.Runnable
            public void run() {
                if (atomicBoolean.compareAndSet(false, true)) {
                    AbstractDistributedProgramRunner.LOG.debug("Cleanup tmp files for {}: {}", program.getId(), file);
                    AbstractDistributedProgramRunner.this.deleteDirectory(file);
                }
            }
        };
        twillController.onRunning(runnable, Threads.SAME_THREAD_EXECUTOR);
        twillController.onTerminated(runnable, Threads.SAME_THREAD_EXECUTOR);
        return twillController;
    }
}
