package co.cask.cdap.data.runtime.main;

import co.cask.cdap.app.guice.AppFabricServiceRuntimeModule;
import co.cask.cdap.app.guice.ProgramRunnerRuntimeModule;
import co.cask.cdap.app.guice.ServiceStoreModules;
import co.cask.cdap.app.store.ServiceStore;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.TwillModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.kerberos.SecurityUtil;
import co.cask.cdap.common.metrics.MetricsCollectionService;
import co.cask.cdap.common.runtime.DaemonMain;
import co.cask.cdap.data.runtime.DataFabricModules;
import co.cask.cdap.data.runtime.DataSetServiceModules;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.security.HBaseSecureStoreUpdater;
import co.cask.cdap.data.stream.StreamAdminModules;
import co.cask.cdap.data2.datafabric.dataset.service.DatasetService;
import co.cask.cdap.data2.util.hbase.ConfigurationTable;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.cdap.explore.client.ExploreClient;
import co.cask.cdap.explore.guice.ExploreClientModule;
import co.cask.cdap.explore.service.ExploreServiceUtils;
import co.cask.cdap.gateway.auth.AuthModule;
import co.cask.cdap.internal.app.services.AppFabricServer;
import co.cask.cdap.logging.appender.LogAppenderInitializer;
import co.cask.cdap.logging.guice.LoggingModules;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import co.cask.cdap.notifications.feeds.guice.NotificationFeedServiceRuntimeModule;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.security.User;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.ServiceListenerAdapter;
import org.apache.twill.common.Services;
import org.apache.twill.internal.zookeeper.LeaderElection;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/runtime/main/MasterServiceMain.class */
public class MasterServiceMain extends DaemonMain {
    private static final Logger LOG = LoggerFactory.getLogger(MasterServiceMain.class);
    private static final long MAX_BACKOFF_TIME_MS = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
    private static final long SUCCESSFUL_RUN_DURATON_MS = TimeUnit.MILLISECONDS.convert(20, TimeUnit.MINUTES);
    protected final CConfiguration cConf;
    protected final Configuration hConf;
    private Injector baseInjector;
    private ZKClientService zkClientService;
    private LeaderElection leaderElection;
    private volatile TwillRunnerService twillRunnerService;
    private volatile TwillController twillController;
    private AppFabricServer appFabricServer;
    private KafkaClientService kafkaClientService;
    private MetricsCollectionService metricsCollectionService;
    private DatasetService dsService;
    private ServiceStore serviceStore;
    private HBaseSecureStoreUpdater secureStoreUpdater;
    private ExploreClient exploreClient;
    private String serviceName;
    private TwillApplication twillApplication;
    private boolean isExploreEnabled;
    private boolean stopFlag = false;
    private final AtomicBoolean isLeader = new AtomicBoolean(false);
    private long lastRunTimeMs = System.currentTimeMillis();
    private int currentRun = 0;

    public MasterServiceMain(CConfiguration cConfiguration, Configuration configuration) {
        this.cConf = cConfiguration;
        this.hConf = configuration;
    }

    public static void main(String[] strArr) throws Exception {
        LOG.info("Starting {}", MasterServiceMain.class.getSimpleName());
        new MasterServiceMain(CConfiguration.create(), HBaseConfiguration.create()).doMain(strArr);
    }

    public void init(String[] strArr) {
        this.isExploreEnabled = this.cConf.getBoolean("explore.enabled");
        this.serviceName = "master.services";
        this.cConf.set("dataset.service.bind.address", getLocalHost().getCanonicalHostName());
        try {
            SecurityUtil.loginForMasterService(this.cConf);
            this.baseInjector = Guice.createInjector(new Module[]{new ConfigModule(this.cConf, this.hConf), new ZKClientModule(), new LocationRuntimeModule().getDistributedModules(), new LoggingModules().getDistributedModules(), new IOModule(), new AuthModule(), new KafkaClientModule(), new TwillModule(), new DiscoveryRuntimeModule().getDistributedModules(), new AppFabricServiceRuntimeModule().getDistributedModules(), new ProgramRunnerRuntimeModule().getDistributedModules(), new DataSetServiceModules().getDistributedModule(), new DataFabricModules().getDistributedModules(), new DataSetsModules().getDistributedModule(), new MetricsClientRuntimeModule().getDistributedModules(), new ServiceStoreModules().getDistributedModule(), new ExploreClientModule(), new NotificationFeedServiceRuntimeModule().getDistributedModules(), new StreamAdminModules().getDistributedModules()});
            this.zkClientService = (ZKClientService) this.baseInjector.getInstance(ZKClientService.class);
            this.kafkaClientService = (KafkaClientService) this.baseInjector.getInstance(KafkaClientService.class);
            this.metricsCollectionService = (MetricsCollectionService) this.baseInjector.getInstance(MetricsCollectionService.class);
            this.dsService = (DatasetService) this.baseInjector.getInstance(DatasetService.class);
            this.exploreClient = (ExploreClient) this.baseInjector.getInstance(ExploreClient.class);
            this.secureStoreUpdater = (HBaseSecureStoreUpdater) this.baseInjector.getInstance(HBaseSecureStoreUpdater.class);
            this.serviceStore = (ServiceStore) this.baseInjector.getInstance(ServiceStore.class);
            checkTransactionRequirements();
            checkExploreRequirements();
        } catch (Exception e) {
            LOG.error("Failed to login as CDAP user", e);
            throw Throwables.propagate(e);
        }
    }

    private void checkTransactionRequirements() {
        try {
            new ConfigurationTable(this.hConf).write(ConfigurationTable.Type.DEFAULT, this.cConf);
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private void checkExploreRequirements() {
        if (this.isExploreEnabled) {
            ExploreServiceUtils.checkHiveSupportWithSecurity(this.hConf);
        }
    }

    public void start() {
        ((LogAppenderInitializer) this.baseInjector.getInstance(LogAppenderInitializer.class)).initialize();
        Futures.getUnchecked(Services.chainStart(this.zkClientService, new Service[]{this.kafkaClientService, this.metricsCollectionService, this.serviceStore}));
        this.leaderElection = new LeaderElection(this.zkClientService, "/election/" + this.serviceName, new ElectionHandler() { // from class: co.cask.cdap.data.runtime.main.MasterServiceMain.1
            public void leader() {
                Map systemServiceInstances = MasterServiceMain.this.getSystemServiceInstances();
                MasterServiceMain.this.twillApplication = MasterServiceMain.this.createTwillApplication(systemServiceInstances);
                if (MasterServiceMain.this.twillApplication == null) {
                    throw new IllegalArgumentException("TwillApplication cannot be null");
                }
                MasterServiceMain.LOG.info("Became leader");
                Injector createChildInjector = MasterServiceMain.this.baseInjector.createChildInjector(new Module[0]);
                MasterServiceMain.this.twillRunnerService = (TwillRunnerService) createChildInjector.getInstance(TwillRunnerService.class);
                MasterServiceMain.this.twillRunnerService.startAndWait();
                MasterServiceMain.this.appFabricServer = (AppFabricServer) createChildInjector.getInstance(AppFabricServer.class);
                MasterServiceMain.this.appFabricServer.startAndWait();
                MasterServiceMain.this.scheduleSecureStoreUpdate(MasterServiceMain.this.twillRunnerService);
                MasterServiceMain.this.runTwillApps();
                MasterServiceMain.this.isLeader.set(true);
            }

            public void follower() {
                MasterServiceMain.LOG.info("Became follower");
                MasterServiceMain.this.dsService.stopAndWait();
                if (MasterServiceMain.this.twillRunnerService != null) {
                    MasterServiceMain.this.twillRunnerService.stopAndWait();
                }
                if (MasterServiceMain.this.appFabricServer != null) {
                    MasterServiceMain.this.appFabricServer.stopAndWait();
                }
                MasterServiceMain.this.isLeader.set(false);
            }
        });
        this.leaderElection.start();
    }

    public void stop() {
        LOG.info("Stopping {}", this.serviceName);
        this.stopFlag = true;
        this.dsService.stopAndWait();
        if (this.isLeader.get() && this.twillController != null) {
            this.twillController.stopAndWait();
        }
        if (this.leaderElection != null) {
            this.leaderElection.stopAndWait();
        }
        Futures.getUnchecked(Services.chainStop(this.serviceStore, new Service[]{this.metricsCollectionService, this.kafkaClientService, this.zkClientService}));
        try {
            this.exploreClient.close();
        } catch (IOException e) {
            LOG.error("Could not close Explore client", e);
            throw Throwables.propagate(e);
        }
    }

    public void destroy() {
    }

    private InetAddress getLocalHost() {
        try {
            return InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            LOG.error("Error obtaining localhost address", e);
            throw Throwables.propagate(e);
        }
    }

    private Map<String, Map<String, String>> getConfigKeys() {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("log.saver", ImmutableMap.of("default", "log.saver.num.instances", "max", "log.saver.max.instances"));
        newHashMap.put("transaction", ImmutableMap.of("default", "data.tx.num.instances", "max", "data.tx.max.instances"));
        newHashMap.put("metrics.processor", ImmutableMap.of("default", "metrics.processor.num.instances", "max", "metrics.processor.max.instances"));
        newHashMap.put("metrics", ImmutableMap.of("default", "metrics.num.instances", "max", "metrics.max.instances"));
        newHashMap.put("streams", ImmutableMap.of("default", "stream.container.instances", "max", "stream.container.instances"));
        newHashMap.put("dataset.executor", ImmutableMap.of("default", "dataset.executor.container.instances", "max", "dataset.executor.max.instances"));
        newHashMap.put("explore.service", ImmutableMap.of("default", "explore.executor.container.instances", "max", "explore.executor.max.instances"));
        return newHashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, Integer> getSystemServiceInstances() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<String, String>> entry : getConfigKeys().entrySet()) {
            String key = entry.getKey();
            Map<String, String> value = entry.getValue();
            try {
                int i = this.cConf.getInt(value.get("max"));
                Integer serviceInstance = this.serviceStore.getServiceInstance(key);
                if (serviceInstance == null || serviceInstance.intValue() == 0) {
                    serviceInstance = Integer.valueOf(Math.min(i, this.cConf.getInt(value.get("default"))));
                } else if (serviceInstance.intValue() > i) {
                    serviceInstance = Integer.valueOf(i);
                }
                this.serviceStore.setServiceInstance(key, serviceInstance.intValue());
                hashMap.put(key, serviceInstance);
                LOG.info("Setting instance count of {} Service to {}", key, serviceInstance);
            } catch (Exception e) {
                LOG.error("Couldn't retrieve instance count {}: {}", new Object[]{key, e.getMessage(), e});
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TwillApplication createTwillApplication(Map<String, Integer> map) {
        try {
            return new MasterTwillApplication(this.cConf, getSavedCConf(), getSavedHConf(), this.isExploreEnabled, map);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleSecureStoreUpdate(TwillRunner twillRunner) {
        if (User.isHBaseSecurityEnabled(this.hConf)) {
            twillRunner.scheduleSecureStoreUpdate(this.secureStoreUpdater, 30000L, this.secureStoreUpdater.getUpdateInterval(), TimeUnit.MILLISECONDS);
        }
    }

    private TwillPreparer prepare(TwillPreparer twillPreparer) {
        return twillPreparer.withDependencies(new Class[]{((HBaseTableUtil) new HBaseTableUtilFactory().get()).getClass()}).addSecureStore(this.secureStoreUpdater.update((String) null, (RunId) null));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runTwillApps() {
        Iterator<TwillController> it = lookupService().iterator();
        if (!it.hasNext()) {
            LOG.info("Starting {} application", this.serviceName);
            this.twillController = getPreparer().start();
            this.twillController.addListener(new ServiceListenerAdapter() { // from class: co.cask.cdap.data.runtime.main.MasterServiceMain.2
                public void running() {
                    if (MasterServiceMain.this.dsService.isRunning()) {
                        return;
                    }
                    MasterServiceMain.this.dsService = (DatasetService) MasterServiceMain.this.baseInjector.getInstance(DatasetService.class);
                    MasterServiceMain.LOG.info("Starting Dataset service");
                    MasterServiceMain.this.dsService.startAndWait();
                }

                public void failed(Service.State state, Throwable th) {
                    MasterServiceMain.LOG.error("{} failed with exception; restarting with back-off", MasterServiceMain.this.serviceName, th);
                    MasterServiceMain.this.backOffRun();
                }

                public void terminated(Service.State state) {
                    MasterServiceMain.LOG.warn("{} was terminated; restarting with back-off", MasterServiceMain.this.serviceName);
                    MasterServiceMain.this.backOffRun();
                }
            }, MoreExecutors.sameThreadExecutor());
            return;
        }
        LOG.info("{} application is already running", this.serviceName);
        this.twillController = it.next();
        if (it.hasNext()) {
            LOG.warn("Found more than one instance of {} running; stopping the others", this.serviceName);
            while (it.hasNext()) {
                TwillController next = it.next();
                LOG.warn("Stopping one extra instance of {}", this.serviceName);
                next.stopAndWait();
            }
            LOG.warn("Stopped extra instances of {}", this.serviceName);
        }
        if (this.dsService.isRunning()) {
            return;
        }
        this.dsService = (DatasetService) this.baseInjector.getInstance(DatasetService.class);
        LOG.info("Starting Dataset service");
        this.dsService.startAndWait();
    }

    private File getSavedHConf() throws IOException {
        File saveHConf = saveHConf(this.hConf, File.createTempFile("hConf", ".xml"));
        saveHConf.deleteOnExit();
        return saveHConf;
    }

    private File getSavedCConf() throws IOException {
        File saveCConf = saveCConf(this.cConf, File.createTempFile("cConf", ".xml"));
        saveCConf.deleteOnExit();
        return saveCConf;
    }

    private Iterable<TwillController> lookupService() {
        Iterable<TwillController> lookup = this.twillRunnerService.lookup(this.serviceName);
        for (int i = 0; i < 100; i++) {
            try {
                if (lookup.iterator().hasNext()) {
                    return lookup;
                }
                TimeUnit.MILLISECONDS.sleep(20L);
            } catch (InterruptedException e) {
                LOG.warn("Caught interrupted exception", e);
                Thread.currentThread().interrupt();
            }
        }
        return lookup;
    }

    private TwillPreparer prepareExploreContainer(TwillPreparer twillPreparer) {
        if (!this.isExploreEnabled) {
            return twillPreparer;
        }
        try {
            for (File file : ExploreServiceUtils.traceExploreDependencies()) {
                LOG.trace("Adding jar file to classpath: {}", file.getName());
                twillPreparer = twillPreparer.withClassPaths(new String[]{file.getName()});
            }
            String property = System.getProperty("explore.conf.files");
            LOG.debug("Hive conf files = {}", property);
            if (property == null) {
                throw new RuntimeException("System property explore.conf.files is not set");
            }
            Iterable<File> classPathJarsFiles = ExploreServiceUtils.getClassPathJarsFiles(property);
            HashSet newHashSet = Sets.newHashSet();
            for (File file2 : classPathJarsFiles) {
                if (file2.getName().matches(".*\\.xml") && !file2.getName().equals("logback.xml")) {
                    if (newHashSet.add(file2.getName())) {
                        LOG.debug("Adding config file: {}", file2.getAbsolutePath());
                        twillPreparer = twillPreparer.withResources(new URI[]{ExploreServiceUtils.hijackHiveConfFile(file2).toURI()});
                    } else {
                        LOG.warn("Ignoring duplicate config file: {}", file2.getAbsolutePath());
                    }
                }
            }
            return twillPreparer;
        } catch (IOException e) {
            throw new RuntimeException("Unable to trace Explore dependencies", e);
        }
    }

    private TwillPreparer getPreparer() {
        TwillPreparer addLogHandler = this.twillRunnerService.prepare(this.twillApplication).addLogHandler(new PrinterLogHandler(new PrintWriter(System.out)));
        URL resource = getClass().getResource("/logback.xml");
        if (resource == null) {
            LOG.warn("Cannot find logback.xml to pass onto Twill Runnables!");
        } else {
            try {
                addLogHandler.withResources(new URI[]{resource.toURI()});
            } catch (URISyntaxException e) {
                LOG.error("Got exception while getting URI for logback.xml - {}", resource);
            }
        }
        return prepare(prepareExploreContainer(addLogHandler));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void backOffRun() {
        if (this.stopFlag) {
            LOG.warn("Not starting a new run when stopFlag is true");
            return;
        }
        if (System.currentTimeMillis() - this.lastRunTimeMs > SUCCESSFUL_RUN_DURATON_MS) {
            this.currentRun = 0;
        }
        try {
            long min = Math.min(500 * ((long) Math.pow(2.0d, this.currentRun)), MAX_BACKOFF_TIME_MS);
            LOG.info("Current restart run = {}; backing off for {} ms", Integer.valueOf(this.currentRun), Long.valueOf(min));
            TimeUnit.MILLISECONDS.sleep(min);
        } catch (InterruptedException e) {
            LOG.warn("Caught interrupted exception", e);
            Thread.currentThread().interrupt();
        }
        runTwillApps();
        this.currentRun++;
        this.lastRunTimeMs = System.currentTimeMillis();
    }

    private static File saveHConf(Configuration configuration, File file) throws IOException {
        BufferedWriter newWriter = Files.newWriter(file, Charsets.UTF_8);
        try {
            configuration.writeXml(newWriter);
            newWriter.close();
            return file;
        } catch (Throwable th) {
            newWriter.close();
            throw th;
        }
    }

    private File saveCConf(CConfiguration cConfiguration, File file) throws IOException {
        BufferedWriter newWriter = Files.newWriter(file, Charsets.UTF_8);
        try {
            cConfiguration.writeXml(newWriter);
            newWriter.close();
            return file;
        } catch (Throwable th) {
            newWriter.close();
            throw th;
        }
    }
}
