package co.cask.cdap.metrics.runtime;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.common.logging.ServiceLoggingContext;
import co.cask.cdap.common.twill.AbstractMasterTwillRunnable;
import co.cask.cdap.data.runtime.DataFabricModules;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.gateway.auth.AuthModule;
import co.cask.cdap.logging.appender.LogAppenderInitializer;
import co.cask.cdap.logging.guice.LoggingModules;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import co.cask.cdap.metrics.guice.MetricsProcessorStatusServiceModule;
import co.cask.cdap.metrics.process.KafkaMetricsProcessorServiceFactory;
import co.cask.cdap.metrics.process.MessageCallbackFactory;
import co.cask.cdap.metrics.process.MetricsMessageCallbackFactory;
import co.cask.cdap.metrics.process.MetricsProcessorStatusService;
import co.cask.cdap.metrics.store.DefaultMetricDatasetFactory;
import co.cask.cdap.metrics.store.MetricDatasetFactory;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Service;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.PrivateModule;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Named;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.TwillContext;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/metrics/runtime/MetricsProcessorTwillRunnable.class */
public final class MetricsProcessorTwillRunnable extends AbstractMasterTwillRunnable {
    private static final Logger LOG = LoggerFactory.getLogger(MetricsProcessorTwillRunnable.class);
    private KafkaMetricsProcessorService kafkaMetricsProcessorService;
    private ZKClientService zkClientService;
    private KafkaClientService kafkaClientService;
    private MetricsProcessorStatusService metricsProcessorStatusService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:co/cask/cdap/metrics/runtime/MetricsProcessorTwillRunnable$KafkaMetricsProcessorModule.class */
    public static final class KafkaMetricsProcessorModule extends PrivateModule {
        KafkaMetricsProcessorModule() {
        }

        protected void configure() {
            bind(MetricDatasetFactory.class).to(DefaultMetricDatasetFactory.class).in(Scopes.SINGLETON);
            bind(MessageCallbackFactory.class).to(MetricsMessageCallbackFactory.class);
            install(new FactoryModuleBuilder().build(KafkaMetricsProcessorServiceFactory.class));
            expose(KafkaMetricsProcessorServiceFactory.class);
        }

        @Named("metrics.kafka.consumer.persist.threshold")
        @Provides
        public int providesConsumerPersistThreshold(CConfiguration cConfiguration) {
            return cConfiguration.getInt("metrics.kafka.consumer.persist.threshold", 100);
        }

        @Named("metrics.kafka.topic.prefix")
        @Provides
        public String providesKafkaTopicPrefix(CConfiguration cConfiguration) {
            return cConfiguration.get("metrics.kafka.topic.prefix", "metrics");
        }
    }

    public MetricsProcessorTwillRunnable(String str, String str2, String str3) {
        super(str, str2, str3);
    }

    protected void doInit(TwillContext twillContext) {
        try {
            getCConfiguration().set("metrics.processor.status.bind.address", twillContext.getHost().getCanonicalHostName());
            Injector createGuiceInjector = createGuiceInjector(getCConfiguration(), getConfiguration());
            ((LogAppenderInitializer) createGuiceInjector.getInstance(LogAppenderInitializer.class)).initialize();
            LoggingContextAccessor.setLoggingContext(new ServiceLoggingContext("system", "services", "metrics.processor"));
            LOG.info("Initializing runnable {}", this.name);
            LOG.info("{} Setting host name to {}", this.name, twillContext.getHost().getCanonicalHostName());
            this.zkClientService = (ZKClientService) createGuiceInjector.getInstance(ZKClientService.class);
            this.kafkaClientService = (KafkaClientService) createGuiceInjector.getInstance(KafkaClientService.class);
            this.kafkaMetricsProcessorService = (KafkaMetricsProcessorService) createGuiceInjector.getInstance(KafkaMetricsProcessorService.class);
            this.metricsProcessorStatusService = (MetricsProcessorStatusService) createGuiceInjector.getInstance(MetricsProcessorStatusService.class);
            LOG.info("Runnable initialized {}", this.name);
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
            throw Throwables.propagate(th);
        }
    }

    public void getServices(List<? super Service> list) {
        list.add(this.zkClientService);
        list.add(this.kafkaClientService);
        list.add(this.kafkaMetricsProcessorService);
        list.add(this.metricsProcessorStatusService);
    }

    public static Injector createGuiceInjector(CConfiguration cConfiguration, Configuration configuration) {
        return Guice.createInjector(new Module[]{new ConfigModule(cConfiguration, configuration), new IOModule(), new ZKClientModule(), new KafkaClientModule(), new AuthModule(), new MetricsClientRuntimeModule().getDistributedModules(), new DiscoveryRuntimeModule().getDistributedModules(), new LoggingModules().getDistributedModules(), new LocationRuntimeModule().getDistributedModules(), new DataFabricModules().getDistributedModules(), new DataSetsModules().getDistributedModules(), new KafkaMetricsProcessorModule(), new MetricsProcessorStatusServiceModule()});
    }
}
