package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.app.guice.DataFabricFacadeModule;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.program.Programs;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramResourceReporter;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.metrics.MetricsCollectionService;
import co.cask.cdap.data.runtime.DataFabricModules;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.gateway.auth.AuthModule;
import co.cask.cdap.internal.app.queue.QueueReaderFactory;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.BasicArguments;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.SimpleProgramOptions;
import co.cask.cdap.logging.appender.LogAppenderInitializer;
import co.cask.cdap.logging.guice.LoggingModules;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.PrivateModule;
import com.google.inject.Scopes;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.google.inject.util.Modules;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.twill.api.Command;
import org.apache.twill.api.ServiceAnnouncer;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillRunnable;
import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Services;
import org.apache.twill.filesystem.LocalLocationFactory;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractProgramTwillRunnable.class */
public abstract class AbstractProgramTwillRunnable<T extends ProgramRunner> implements TwillRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractProgramTwillRunnable.class);
    private String name;
    private String hConfName;
    private String cConfName;
    private Injector injector;
    private Program program;
    private ProgramOptions programOpts;
    private ProgramController controller;
    private Configuration hConf;
    private CConfiguration cConf;
    private ZKClientService zkClientService;
    private KafkaClientService kafkaClientService;
    private MetricsCollectionService metricsCollectionService;
    private ProgramResourceReporter resourceReporter;
    private LogAppenderInitializer logAppenderInitializer;
    private CountDownLatch runlatch;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/AbstractProgramTwillRunnable$ProgramFactory.class */
    private static final class ProgramFactory {
        private final LocationFactory locationFactory;

        @Inject
        ProgramFactory(@Named("program.location.factory") LocationFactory locationFactory) {
            this.locationFactory = locationFactory;
        }

        public Program create(String str) throws IOException {
            return Programs.createWithUnpack(this.locationFactory.create(str), Files.createTempDir());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractProgramTwillRunnable(String str, String str2, String str3) {
        this.name = str;
        this.hConfName = str2;
        this.cConfName = str3;
    }

    protected abstract Class<T> getProgramClass();

    protected Map<String, String> getConfigs() {
        return ImmutableMap.of();
    }

    public TwillRunnableSpecification configure() {
        return TwillRunnableSpecification.Builder.with().setName(this.name).withConfigs(ImmutableMap.builder().put("hConf", this.hConfName).put("cConf", this.cConfName).putAll(getConfigs()).build()).build();
    }

    public void initialize(TwillContext twillContext) {
        this.runlatch = new CountDownLatch(1);
        this.name = twillContext.getSpecification().getName();
        Map<String, String> configs = twillContext.getSpecification().getConfigs();
        LOG.info("Initialize runnable: " + this.name);
        try {
            CommandLine parseArgs = parseArgs(twillContext.getApplicationArguments());
            this.hConf = new Configuration();
            this.hConf.clear();
            this.hConf.addResource(new File(configs.get("hConf")).toURI().toURL());
            UserGroupInformation.setConfiguration(this.hConf);
            this.cConf = CConfiguration.create();
            this.cConf.clear();
            this.cConf.addResource(new File(configs.get("cConf")).toURI().toURL());
            this.injector = Guice.createInjector(new Module[]{createModule(twillContext)});
            this.zkClientService = (ZKClientService) this.injector.getInstance(ZKClientService.class);
            this.kafkaClientService = (KafkaClientService) this.injector.getInstance(KafkaClientService.class);
            this.metricsCollectionService = (MetricsCollectionService) this.injector.getInstance(MetricsCollectionService.class);
            this.logAppenderInitializer = (LogAppenderInitializer) this.injector.getInstance(LogAppenderInitializer.class);
            this.logAppenderInitializer.initialize();
            try {
                this.program = ((ProgramFactory) this.injector.getInstance(ProgramFactory.class)).create(parseArgs.getOptionValue("jar"));
                this.programOpts = new SimpleProgramOptions(this.name, createProgramArguments(twillContext, configs), (Arguments) new Gson().fromJson(parseArgs.getOptionValue("rargs"), BasicArguments.class));
                this.resourceReporter = new ProgramRunnableResourceReporter(this.program, this.metricsCollectionService, twillContext);
                LOG.info("Runnable initialized: " + this.name);
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
            throw Throwables.propagate(th);
        }
    }

    public void handleCommand(Command command) throws Exception {
        this.runlatch.await();
        if (ProgramCommands.SUSPEND.equals(command)) {
            this.controller.suspend().get();
            return;
        }
        if (ProgramCommands.RESUME.equals(command)) {
            this.controller.resume().get();
        } else if (!ProgramOptionConstants.INSTANCES.equals(command.getCommand())) {
            LOG.warn("Ignore unsupported command: " + command);
        } else {
            this.controller.command(ProgramOptionConstants.INSTANCES, Integer.valueOf(Integer.parseInt((String) command.getOptions().get("count")))).get();
        }
    }

    public void stop() {
        try {
            LOG.info("Stopping runnable: {}", this.name);
            this.controller.stop().get();
            this.logAppenderInitializer.close();
        } catch (Exception e) {
            LOG.error("Fail to stop: {}", e, e);
            throw Throwables.propagate(e);
        }
    }

    public void run() {
        LOG.info("Starting metrics service");
        Futures.getUnchecked(Services.chainStart(this.zkClientService, new Service[]{this.kafkaClientService, this.metricsCollectionService, this.resourceReporter}));
        LOG.info("Starting runnable: {}", this.name);
        this.controller = ((ProgramRunner) this.injector.getInstance(getProgramClass())).run(this.program, this.programOpts);
        final SettableFuture create = SettableFuture.create();
        this.controller.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractProgramTwillRunnable.1
            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void stopped() {
                create.set(ProgramController.State.STOPPED);
            }

            @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
            public void error(Throwable th) {
                AbstractProgramTwillRunnable.LOG.error("Program runner error out.", th);
                create.setException(th);
            }
        }, MoreExecutors.sameThreadExecutor());
        this.runlatch.countDown();
        try {
            create.get();
            LOG.info("Program stopped.");
        } catch (Throwable th) {
            LOG.error("Program terminated due to error.", th);
            throw Throwables.propagate(th);
        }
    }

    public void destroy() {
        LOG.info("Releasing resources: {}", this.name);
        Futures.getUnchecked(Services.chainStop(this.resourceReporter, new Service[]{this.metricsCollectionService, this.kafkaClientService, this.zkClientService}));
        LOG.info("Runnable stopped: {}", this.name);
    }

    private CommandLine parseArgs(String[] strArr) {
        try {
            return new PosixParser().parse(new Options().addOption(createOption("jar", "Program jar location")).addOption(createOption("rargs", "Runtime arguments")), strArr);
        } catch (ParseException e) {
            throw Throwables.propagate(e);
        }
    }

    private Option createOption(String str, String str2) {
        Option option = new Option(str, true, str2);
        option.setRequired(true);
        return option;
    }

    private Arguments createProgramArguments(TwillContext twillContext, Map<String, String> map) {
        return new BasicArguments(ImmutableMap.builder().put(ProgramOptionConstants.INSTANCE_ID, Integer.toString(twillContext.getInstanceId())).put(ProgramOptionConstants.INSTANCES, Integer.toString(twillContext.getInstanceCount())).put(ProgramOptionConstants.RUN_ID, twillContext.getApplicationRunId().getId()).putAll(Maps.filterKeys(map, Predicates.not(Predicates.in(ImmutableSet.of("hConf", "cConf"))))).build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Module createModule(final TwillContext twillContext) {
        return Modules.combine(new Module[]{new ConfigModule(this.cConf, this.hConf), new IOModule(), new ZKClientModule(), new KafkaClientModule(), new AuthModule(), new MetricsClientRuntimeModule().getDistributedModules(), new LocationRuntimeModule().getDistributedModules(), new LoggingModules().getDistributedModules(), new DiscoveryRuntimeModule().getDistributedModules(), new DataFabricModules().getDistributedModules(), new DataSetsModules().getDistributedModule(), new AbstractModule() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractProgramTwillRunnable.2
            protected void configure() {
                bind(InetAddress.class).annotatedWith(Names.named("app.bind.address")).toInstance(twillContext.getHost());
                bind(QueueReaderFactory.class).in(Scopes.SINGLETON);
                install(AbstractProgramTwillRunnable.this.createProgramFactoryModule());
                install(new DataFabricFacadeModule());
                bind(ServiceAnnouncer.class).toInstance(new ServiceAnnouncer() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractProgramTwillRunnable.2.1
                    public Cancellable announce(String str, int i) {
                        return twillContext.announce(str, i);
                    }
                });
            }
        }});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Module createProgramFactoryModule() {
        return new PrivateModule() { // from class: co.cask.cdap.internal.app.runtime.distributed.AbstractProgramTwillRunnable.3
            protected void configure() {
                bind(LocationFactory.class).annotatedWith(Names.named("program.location.factory")).toInstance(new LocalLocationFactory(new File(System.getProperty("user.dir"))));
                bind(ProgramFactory.class).in(Scopes.SINGLETON);
                expose(ProgramFactory.class);
            }
        };
    }
}
