package co.cask.cdap.logging.run;

import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.data.runtime.DataFabricModules;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data2.audit.AuditModule;
import co.cask.cdap.logging.LoggingConfiguration;
import co.cask.cdap.logging.guice.LogSaverServiceModule;
import co.cask.cdap.logging.guice.LoggingModules;
import co.cask.cdap.logging.save.KafkaLogSaverService;
import co.cask.cdap.logging.service.LogSaverStatusService;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.internal.Services;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/logging/run/LogSaverTwillRunnable.class */
public final class LogSaverTwillRunnable extends AbstractTwillRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(LogSaverTwillRunnable.class);
    private SettableFuture<?> completion;
    private String name;
    private String hConfName;
    private String cConfName;
    private ZKClientService zkClientService;
    private KafkaClientService kafkaClientService;
    private KafkaLogSaverService logSaverService;
    private LogSaverStatusService logSaverStatusService;
    private MetricsCollectionService metricsCollectionService;

    public LogSaverTwillRunnable(String str, String str2, String str3) {
        this.name = str;
        this.hConfName = str2;
        this.cConfName = str3;
    }

    public TwillRunnableSpecification configure() {
        return TwillRunnableSpecification.Builder.with().setName(this.name).withConfigs(ImmutableMap.of("hConf", this.hConfName, "cConf", this.cConfName)).build();
    }

    public void initialize(TwillContext twillContext) {
        super.initialize(twillContext);
        this.completion = SettableFuture.create();
        this.name = twillContext.getSpecification().getName();
        Map configs = twillContext.getSpecification().getConfigs();
        LOG.info("Initialize runnable: " + this.name);
        try {
            Configuration configuration = new Configuration();
            configuration.clear();
            configuration.addResource(new File((String) configs.get("hConf")).toURI().toURL());
            UserGroupInformation.setConfiguration(configuration);
            CConfiguration create = CConfiguration.create(new File((String) configs.get("cConf")), new File[0]);
            create.set("log.saver.status.bind.address", twillContext.getHost().getCanonicalHostName());
            if (create.get("zookeeper.quorum") == null) {
                LOG.error("No ZooKeeper quorum provided.");
                throw new IllegalStateException("No ZooKeeper quorum provided.");
            }
            Injector createGuiceInjector = createGuiceInjector(create, configuration);
            this.zkClientService = (ZKClientService) createGuiceInjector.getInstance(ZKClientService.class);
            this.kafkaClientService = (KafkaClientService) createGuiceInjector.getInstance(KafkaClientService.class);
            this.logSaverService = (KafkaLogSaverService) createGuiceInjector.getInstance(KafkaLogSaverService.class);
            LOG.info("Num partitions = {}", Integer.valueOf(Integer.parseInt(create.get(LoggingConfiguration.NUM_PARTITIONS, LoggingConfiguration.DEFAULT_NUM_PARTITIONS))));
            this.logSaverStatusService = (LogSaverStatusService) createGuiceInjector.getInstance(LogSaverStatusService.class);
            this.metricsCollectionService = (MetricsCollectionService) createGuiceInjector.getInstance(MetricsCollectionService.class);
            LOG.info("Runnable initialized: " + this.name);
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
            throw Throwables.propagate(th);
        }
    }

    public void run() {
        LOG.info("Starting runnable " + this.name);
        ShutdownHookManager.get().addShutdownHook(new Runnable() { // from class: co.cask.cdap.logging.run.LogSaverTwillRunnable.1
            @Override // java.lang.Runnable
            public void run() {
                LogSaverTwillRunnable.LOG.info("Shutdown hook triggered.");
                LogSaverTwillRunnable.this.stop();
            }
        }, 11);
        Futures.getUnchecked(Services.chainStart(this.zkClientService, new Service[]{this.kafkaClientService, this.metricsCollectionService, this.logSaverService, this.logSaverStatusService}));
        LOG.info("Runnable started " + this.name);
        try {
            this.completion.get();
            LOG.info("Runnable stopped " + this.name);
        } catch (InterruptedException e) {
            LOG.error("Waiting on completion interrupted", e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            LOG.error("Completed with exception. Exception get propagated", e2);
            throw Throwables.propagate(e2);
        }
    }

    public void stop() {
        LOG.info("Stopping runnable " + this.name);
        Futures.getUnchecked(Services.chainStop(this.logSaverStatusService, new Service[]{this.logSaverService, this.metricsCollectionService, this.kafkaClientService, this.zkClientService}));
        this.completion.set((Object) null);
    }

    private static Injector createGuiceInjector(CConfiguration cConfiguration, Configuration configuration) {
        return Guice.createInjector(new Module[]{new ConfigModule(cConfiguration, configuration), new IOModule(), new ZKClientModule(), new KafkaClientModule(), new MetricsClientRuntimeModule().getDistributedModules(), new DiscoveryRuntimeModule().getDistributedModules(), new LocationRuntimeModule().getDistributedModules(), new DataFabricModules().getDistributedModules(), new DataSetsModules().getDistributedModules(), new LogSaverServiceModule(), new LoggingModules().getDistributedModules(), new AuditModule().getDistributedModules()});
    }
}
