package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.app.guice.DistributedProgramRunnableModule;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.program.Programs;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.lang.jar.BundleJarUtil;
import co.cask.cdap.data.stream.StreamCoordinatorClient;
import co.cask.cdap.internal.app.ApplicationSpecificationAdapter;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.internal.app.runtime.codec.ArgumentsCodec;
import co.cask.cdap.internal.app.runtime.codec.ProgramOptionsCodec;
import co.cask.cdap.logging.appender.LogAppenderInitializer;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.security.authorization.AuthorizationEnforcementService;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.security.Permission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.api.Command;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillRunnable;
import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.Services;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractProgramTwillRunnable.class */
public abstract class AbstractProgramTwillRunnable<T extends ProgramRunner> implements TwillRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractProgramTwillRunnable.class);
    private static final Gson GSON = ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).registerTypeAdapter(Arguments.class, new ArgumentsCodec()).registerTypeAdapter(ProgramOptions.class, new ProgramOptionsCodec()).create();
    protected String name;
    private T programRunner;
    private Program program;
    private ProgramOptions programOpts;
    private ProgramController controller;
    private Configuration hConf;
    private CConfiguration cConf;
    private List<Service> coreServices;
    private LogAppenderInitializer logAppenderInitializer;
    private CountDownLatch runLatch;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractProgramTwillRunnable$RunnableSecurityManager.class */
    private static final class RunnableSecurityManager extends SecurityManager {
        private final SecurityManager delegate;

        private RunnableSecurityManager(@Nullable SecurityManager securityManager) {
            this.delegate = securityManager;
        }

        @Override // java.lang.SecurityManager
        public void checkPermission(Permission permission) {
            if ("setSecurityManager".equals(permission.getName()) && isFromSpark()) {
                throw new SecurityException("Set SecurityManager not allowed from Spark class: " + Arrays.toString(getClassContext()));
            }
            if (this.delegate != null) {
                this.delegate.checkPermission(permission);
            }
        }

        @Override // java.lang.SecurityManager
        public void checkPermission(Permission permission, Object obj) {
            if ("setSecurityManager".equals(permission.getName()) && isFromSpark()) {
                throw new SecurityException("Set SecurityManager not allowed from Spark class: " + Arrays.toString(getClassContext()));
            }
            if (this.delegate != null) {
                this.delegate.checkPermission(permission, obj);
            }
        }

        @Override // java.lang.SecurityManager
        public void checkExit(int i) {
            if (isFromSpark()) {
                throw new SecurityException("Exit not allowed from Spark class: " + Arrays.toString(getClassContext()));
            }
            if (this.delegate != null) {
                this.delegate.checkExit(i);
            }
        }

        private boolean isFromSpark() {
            for (Class cls : getClassContext()) {
                if (cls.getName().startsWith("org.apache.spark.")) {
                    return true;
                }
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractProgramTwillRunnable(String str) {
        this.name = str;
    }

    protected Map<String, String> getConfigs() {
        return ImmutableMap.of();
    }

    public TwillRunnableSpecification configure() {
        return TwillRunnableSpecification.Builder.with().setName(this.name).withConfigs(getConfigs()).build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void initialize(TwillContext twillContext) {
        System.setSecurityManager(new RunnableSecurityManager(System.getSecurityManager()));
        this.runLatch = new CountDownLatch(1);
        this.coreServices = new ArrayList();
        this.name = twillContext.getSpecification().getName();
        LOG.info("Initializing runnable: " + this.name);
        try {
            CommandLine parseArgs = parseArgs(twillContext.getApplicationArguments());
            this.hConf = new Configuration();
            this.hConf.clear();
            this.hConf.addResource(new File(parseArgs.getOptionValue("hConf")).toURI().toURL());
            UserGroupInformation.setConfiguration(this.hConf);
            this.cConf = CConfiguration.create(new File(parseArgs.getOptionValue("cConf")), new File[0]);
            this.programOpts = createProgramOptions(parseArgs, twillContext, twillContext.getSpecification().getConfigs());
            String option = this.programOpts.getArguments().getOption(ProgramOptionConstants.PRINCIPAL);
            ProgramId programId = (ProgramId) GSON.fromJson(parseArgs.getOptionValue("pid"), ProgramId.class);
            Injector createInjector = Guice.createInjector(new Module[]{createModule(twillContext, programId, option)});
            this.coreServices.add(createInjector.getInstance(ZKClientService.class));
            this.coreServices.add(createInjector.getInstance(KafkaClientService.class));
            this.coreServices.add(createInjector.getInstance(BrokerService.class));
            this.coreServices.add(createInjector.getInstance(MetricsCollectionService.class));
            this.coreServices.add(createInjector.getInstance(StreamCoordinatorClient.class));
            this.coreServices.add(createInjector.getInstance(AuthorizationEnforcementService.class));
            this.logAppenderInitializer = (LogAppenderInitializer) createInjector.getInstance(LogAppenderInitializer.class);
            this.logAppenderInitializer.initialize();
            this.programRunner = createProgramRunner(createInjector);
            try {
                Location location = Locations.toLocation(new File(parseArgs.getOptionValue("jar")));
                this.program = Programs.create(this.cConf, this.programRunner, new ProgramDescriptor(programId, readAppSpec(new File(parseArgs.getOptionValue("appSpec")))), location, BundleJarUtil.unJar(location, Files.createTempDir()));
                this.coreServices.add(new ProgramRunnableResourceReporter(this.program.getId(), (MetricsCollectionService) createInjector.getInstance(MetricsCollectionService.class), twillContext));
                LOG.info("Runnable initialized: {}", this.name);
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
            throw Throwables.propagate(th);
        }
    }

    public void handleCommand(Command command) throws Exception {
        this.runLatch.await();
        if (ProgramCommands.SUSPEND.equals(command)) {
            this.controller.suspend().get();
            return;
        }
        if (ProgramCommands.RESUME.equals(command)) {
            this.controller.resume().get();
        } else if (!ProgramOptionConstants.INSTANCES.equals(command.getCommand())) {
            LOG.warn("Ignore unsupported command: " + command);
        } else {
            this.controller.command(ProgramOptionConstants.INSTANCES, Integer.valueOf(Integer.parseInt((String) command.getOptions().get("count")))).get();
        }
    }

    public void stop() {
        try {
            LOG.info("Stopping runnable: {}.", this.name);
            if (this.controller != null) {
                this.controller.stop().get();
            }
        } catch (Exception e) {
            LOG.error("Failed to stop runnable: {}.", this.name, e);
            throw Throwables.propagate(e);
        }
    }

    protected Arguments resolveScope(Arguments arguments) {
        return arguments;
    }

    protected boolean propagateServiceError() {
        return true;
    }

    protected T createProgramRunner(Injector injector) {
        Type type = TypeToken.of(getClass()).getSupertype(AbstractProgramTwillRunnable.class).getType();
        Preconditions.checkState(type instanceof ParameterizedType, "Invalid ProgramTwillRunnable class %s. Expected to be a ParameterizedType.", new Object[]{getClass()});
        Type type2 = ((ParameterizedType) type).getActualTypeArguments()[0];
        Preconditions.checkState(type2 instanceof Class, "ProgramRunner type is not a class: %s", new Object[]{type2});
        return (T) injector.getInstance((Class) type2);
    }

    public void run() {
        Futures.getUnchecked(Services.chainStart(this.coreServices.get(0), (Service[]) this.coreServices.subList(1, this.coreServices.size()).toArray(new Service[this.coreServices.size() - 1])));
        LOG.info("Starting runnable: {}", this.name);
        this.controller = this.programRunner.run(this.program, this.programOpts);
        final SettableFuture create = SettableFuture.create();
        this.controller.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractProgramTwillRunnable.1
            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void alive() {
                AbstractProgramTwillRunnable.this.runLatch.countDown();
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void init(ProgramController.State state, @Nullable Throwable th) {
                if (state == ProgramController.State.ALIVE) {
                    alive();
                } else {
                    super.init(state, th);
                }
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void completed() {
                create.set(ProgramController.State.COMPLETED);
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void killed() {
                create.set(ProgramController.State.KILLED);
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void error(Throwable th) {
                AbstractProgramTwillRunnable.LOG.error("Program runner error out.", th);
                create.setException(th);
            }
        }, MoreExecutors.sameThreadExecutor());
        try {
            try {
                create.get();
                LOG.info("Program {} stopped.", this.name);
                if (this.programRunner instanceof Closeable) {
                    Closeables.closeQuietly((Closeable) this.programRunner);
                }
                this.runLatch.countDown();
            } catch (InterruptedException e) {
                LOG.warn("Program {} interrupted.", this.name, e);
                if (this.programRunner instanceof Closeable) {
                    Closeables.closeQuietly((Closeable) this.programRunner);
                }
                this.runLatch.countDown();
            } catch (ExecutionException e2) {
                LOG.error("Program {} execution failed.", this.name, e2);
                if (propagateServiceError()) {
                    throw Throwables.propagate(Throwables.getRootCause(e2));
                }
                if (this.programRunner instanceof Closeable) {
                    Closeables.closeQuietly((Closeable) this.programRunner);
                }
                this.runLatch.countDown();
            }
        } catch (Throwable th) {
            if (this.programRunner instanceof Closeable) {
                Closeables.closeQuietly((Closeable) this.programRunner);
            }
            this.runLatch.countDown();
            throw th;
        }
    }

    public void destroy() {
        LOG.info("Releasing resources: {}", this.name);
        try {
            if (this.program != null) {
                Closeables.closeQuietly(this.program);
            }
            List reverse = Lists.reverse(this.coreServices);
            Futures.getUnchecked(Services.chainStop((Service) reverse.get(0), (Service[]) reverse.subList(1, reverse.size()).toArray(new Service[reverse.size() - 1])));
            LOG.info("Runnable stopped: {}", this.name);
            if (this.logAppenderInitializer != null) {
                this.logAppenderInitializer.close();
            }
        } catch (Throwable th) {
            if (this.logAppenderInitializer != null) {
                this.logAppenderInitializer.close();
            }
            throw th;
        }
    }

    private CommandLine parseArgs(String[] strArr) {
        try {
            return new PosixParser().parse(new Options().addOption(createOption("jar", "Program jar location")).addOption(createOption("hConf", "Hadoop config file")).addOption(createOption("cConf", "CDAP config file")).addOption(createOption("appSpec", "Application specification file")).addOption(createOption("popts", "Program options")).addOption(createOption("pid", "Program ID")), strArr);
        } catch (ParseException e) {
            throw Throwables.propagate(e);
        }
    }

    private Option createOption(String str, String str2) {
        Option option = new Option(str, true, str2);
        option.setRequired(true);
        return option;
    }

    private ProgramOptions createProgramOptions(CommandLine commandLine, TwillContext twillContext, Map<String, String> map) {
        ProgramOptions programOptions = (ProgramOptions) GSON.fromJson(commandLine.getOptionValue("popts"), ProgramOptions.class);
        HashMap newHashMap = Maps.newHashMap(programOptions.getArguments().asMap());
        newHashMap.put(ProgramOptionConstants.INSTANCE_ID, Integer.toString(twillContext.getInstanceId()));
        newHashMap.put(ProgramOptionConstants.INSTANCES, Integer.toString(twillContext.getInstanceCount()));
        newHashMap.put(ProgramOptionConstants.TWILL_RUN_ID, twillContext.getApplicationRunId().getId());
        newHashMap.put(ProgramOptionConstants.HOST, twillContext.getHost().getCanonicalHostName());
        newHashMap.putAll(map);
        return new SimpleProgramOptions(twillContext.getSpecification().getName(), new BasicArguments(newHashMap), resolveScope(programOptions.getUserArguments()), programOptions.isDebug());
    }

    private ApplicationSpecification readAppSpec(File file) throws IOException {
        BufferedReader newReader = Files.newReader(file, Charsets.UTF_8);
        Throwable th = null;
        try {
            try {
                ApplicationSpecification applicationSpecification = (ApplicationSpecification) GSON.fromJson(newReader, ApplicationSpecification.class);
                if (newReader != null) {
                    if (0 != 0) {
                        try {
                            newReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newReader.close();
                    }
                }
                return applicationSpecification;
            } finally {
            }
        } catch (Throwable th3) {
            if (newReader != null) {
                if (th != null) {
                    try {
                        newReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newReader.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Module createModule(TwillContext twillContext, ProgramId programId, @Nullable String str) {
        return new DistributedProgramRunnableModule(this.cConf, this.hConf).createModule(twillContext, programId, str);
    }
}
