package org.apache.gobblin.service.modules.core;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Stage;
import com.linkedin.data.template.StringMap;
import com.linkedin.r2.RemoteInvocationException;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.inject.Named;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareHistogram;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.app.ApplicationException;
import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowConfigsResource;
import org.apache.gobblin.service.FlowConfigsResourceHandler;
import org.apache.gobblin.service.FlowConfigsV2Resource;
import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
import org.apache.gobblin.service.FlowExecutionResourceHandler;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.GroupOwnershipService;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/core/GobblinServiceManager.class */
public class GobblinServiceManager implements ApplicationLauncher, StandardMetricsBridge {
    public static final String SERVICE_NAME_OPTION_NAME = "service_name";
    public static final String SERVICE_ID_OPTION_NAME = "service_id";
    public static final String SERVICE_EVENT_BUS_NAME = "GobblinServiceManagerEventBus";
    private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceManager.class);
    protected final ServiceBasedAppLauncher serviceLauncher;
    private volatile boolean stopInProgress = false;

    @Inject
    @Named(SERVICE_EVENT_BUS_NAME)
    protected EventBus eventBus;
    protected final FileSystem fs;
    protected final Path serviceWorkDir;
    protected final GobblinServiceConfiguration configuration;

    @Inject(optional = true)
    protected TopologyCatalog topologyCatalog;

    @Inject(optional = true)
    protected FlowCatalog flowCatalog;

    @Inject(optional = true)
    protected GobblinServiceJobScheduler scheduler;

    @Inject
    protected FlowConfigsResourceHandler resourceHandler;

    @Inject
    protected FlowConfigsV2ResourceHandler v2ResourceHandler;

    @Inject
    protected FlowExecutionResourceHandler flowExecutionResourceHandler;

    @Inject
    protected FlowStatusGenerator flowStatusGenerator;

    @Inject
    protected GroupOwnershipService groupOwnershipService;

    @Inject
    private Injector injector;
    protected boolean flowCatalogLocalCommit;

    @Inject(optional = true)
    protected Orchestrator orchestrator;

    @Inject(optional = true)
    protected EmbeddedRestliServer restliServer;

    @Inject(optional = true)
    protected TopologySpecFactory topologySpecFactory;

    @Inject
    protected SchedulerService schedulerService;

    @Inject(optional = true)
    protected Optional<HelixManager> helixManager;

    @Inject(optional = true)
    protected GitConfigMonitor gitConfigMonitor;

    @Inject(optional = true)
    protected DagManager dagManager;

    @Inject(optional = true)
    protected KafkaJobStatusMonitor jobStatusMonitor;
    protected Optional<HelixLeaderState> helixLeaderGauges;

    @Inject(optional = true)
    protected D2Announcer d2Announcer;
    private final MetricContext metricContext;
    private final Metrics metrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/service/modules/core/GobblinServiceManager$HelixLeaderState.class */
    public static class HelixLeaderState {
        private LeaderState state;

        private HelixLeaderState() {
            this.state = LeaderState.UNKNOWN;
        }

        public void setState(LeaderState leaderState) {
            this.state = leaderState;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/service/modules/core/GobblinServiceManager$LeaderState.class */
    public enum LeaderState {
        UNKNOWN(-1),
        SLAVE(0),
        MASTER(1);

        private int value;

        LeaderState(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }
    }

    /* loaded from: input_file:org/apache/gobblin/service/modules/core/GobblinServiceManager$Metrics.class */
    private class Metrics extends StandardMetricsBridge.StandardMetrics {
        public static final String SERVICE_LEADERSHIP_CHANGE = "serviceLeadershipChange";
        private ContextAwareHistogram serviceLeadershipChange;

        public Metrics(MetricContext metricContext, Config config) {
            this.serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, ConfigUtils.getInt(config, "metrics.timer.window.size.in.minutes", 15).intValue(), TimeUnit.MINUTES);
            this.contextAwareMetrics.add(this.serviceLeadershipChange);
        }
    }

    @Inject
    protected GobblinServiceManager(GobblinServiceConfiguration gobblinServiceConfiguration) throws Exception {
        this.configuration = (GobblinServiceConfiguration) Objects.requireNonNull(gobblinServiceConfiguration);
        Properties configToProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(gobblinServiceConfiguration.getInnerConfig(), "gobblinServiceAppLauncher").withFallback(gobblinServiceConfiguration.getInnerConfig()));
        if (!configToProperties.contains("app.stop.time.seconds")) {
            configToProperties.setProperty("app.stop.time.seconds", Long.toString(300L));
        }
        this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(gobblinServiceConfiguration.getInnerConfig()), getClass());
        this.metrics = new Metrics(this.metricContext, gobblinServiceConfiguration.getInnerConfig());
        this.serviceLauncher = new ServiceBasedAppLauncher(configToProperties, gobblinServiceConfiguration.getServiceName());
        this.fs = buildFileSystem(gobblinServiceConfiguration.getInnerConfig());
        this.serviceWorkDir = (Path) ObjectUtils.firstNonNull(new Path[]{gobblinServiceConfiguration.getServiceWorkDir(), getServiceWorkDirPath(this.fs, gobblinServiceConfiguration.getServiceName(), gobblinServiceConfiguration.getServiceId())});
        initializeHelixLeaderGauge();
    }

    public static GobblinServiceManager create(String str, String str2, Config config, @Nullable Path path) {
        return create(new GobblinServiceConfiguration(str, str2, config, path));
    }

    public static GobblinServiceManager create(GobblinServiceConfiguration gobblinServiceConfiguration) {
        return (GobblinServiceManager) Guice.createInjector(Stage.PRODUCTION, new Module[]{new GobblinServiceGuiceModule(gobblinServiceConfiguration)}).getInstance(GobblinServiceManager.class);
    }

    public URI getRestLiServerListeningURI() {
        if (this.restliServer == null) {
            throw new IllegalStateException("Restli server does not exist because it was not configured or disabled");
        }
        return this.restliServer.getListeningURI();
    }

    private void initializeHelixLeaderGauge() {
        this.helixLeaderGauges = Optional.of(new HelixLeaderState());
        String name = MetricRegistry.name("GobblinService", new String[]{"HelixLeaderState"});
        this.metricContext.register(name, this.metricContext.newContextAwareGauge(name, () -> {
            return Integer.valueOf(((HelixLeaderState) this.helixLeaderGauges.get()).state.getValue());
        }));
    }

    @VisibleForTesting
    public boolean isLeader() {
        return !this.helixManager.isPresent() || ((HelixManager) this.helixManager.get()).isLeader();
    }

    private FileSystem buildFileSystem(Config config) throws IOException {
        return config.hasPath("fs.uri") ? FileSystem.get(URI.create(config.getString("fs.uri")), new Configuration()) : FileSystem.get(new Configuration());
    }

    private Path getServiceWorkDirPath(FileSystem fileSystem, String str, String str2) {
        return new Path(fileSystem.getHomeDirectory(), str + "/" + str2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleLeadershipChange(NotificationContext notificationContext) {
        if (this.helixManager.isPresent() && ((HelixManager) this.helixManager.get()).isLeader()) {
            LOGGER.info("Leader notification for {} HM.isLeader {}", ((HelixManager) this.helixManager.get()).getInstanceName(), Boolean.valueOf(((HelixManager) this.helixManager.get()).isLeader()));
            if (this.configuration.isSchedulerEnabled()) {
                LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
                this.scheduler.setActive(true);
            }
            if (this.helixLeaderGauges.isPresent()) {
                ((HelixLeaderState) this.helixLeaderGauges.get()).setState(LeaderState.MASTER);
            }
            if (this.configuration.isGitConfigMonitorEnabled()) {
                this.gitConfigMonitor.setActive(true);
            }
            if (this.configuration.isDagManagerEnabled() && this.topologyCatalog.getInitComplete().getCount() == 0) {
                this.dagManager.setActive(true);
                this.eventBus.register(this.dagManager);
            }
            if (this.configuration.isOnlyAnnounceLeader()) {
                this.d2Announcer.markUpServer();
                return;
            }
            return;
        }
        if (this.helixManager.isPresent()) {
            LOGGER.info("Leader lost notification for {} HM.isLeader {}", ((HelixManager) this.helixManager.get()).getInstanceName(), Boolean.valueOf(((HelixManager) this.helixManager.get()).isLeader()));
            if (this.configuration.isSchedulerEnabled()) {
                LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
                this.scheduler.setActive(false);
            }
            if (this.helixLeaderGauges.isPresent()) {
                ((HelixLeaderState) this.helixLeaderGauges.get()).setState(LeaderState.SLAVE);
            }
            if (this.configuration.isGitConfigMonitorEnabled()) {
                this.gitConfigMonitor.setActive(false);
            }
            if (this.configuration.isDagManagerEnabled()) {
                this.dagManager.setActive(false);
                this.eventBus.unregister(this.dagManager);
            }
            if (this.configuration.isOnlyAnnounceLeader()) {
                this.d2Announcer.markDownServer();
            }
        }
    }

    private void registerServicesInLauncher() {
        if (this.configuration.isTopologyCatalogEnabled()) {
            this.serviceLauncher.addService(this.topologyCatalog);
        }
        if (this.configuration.isFlowCatalogEnabled()) {
            this.serviceLauncher.addService(this.flowCatalog);
            if (this.configuration.isGitConfigMonitorEnabled()) {
                this.serviceLauncher.addService(this.gitConfigMonitor);
            }
        }
        if (this.configuration.isDagManagerEnabled()) {
            this.serviceLauncher.addService(this.dagManager);
        }
        if (this.configuration.isJobStatusMonitorEnabled()) {
            this.serviceLauncher.addService(this.jobStatusMonitor);
        }
        if (this.configuration.isSchedulerEnabled()) {
            this.serviceLauncher.addService(this.schedulerService);
            this.serviceLauncher.addService(this.scheduler);
        }
        if (this.configuration.isRestLIServerEnabled()) {
            this.serviceLauncher.addService(this.restliServer);
        }
    }

    private void configureServices() {
        if (this.configuration.isRestLIServerEnabled()) {
            this.restliServer = EmbeddedRestliServer.builder().resources(Lists.newArrayList(new Class[]{FlowConfigsResource.class, FlowConfigsV2Resource.class})).injector(this.injector).build();
            if (this.configuration.getInnerConfig().hasPath("gobblin.service.port")) {
                this.restliServer.setPort(this.configuration.getInnerConfig().getInt("gobblin.service.port"));
            }
        }
        registerServicesInLauncher();
        if (this.configuration.isSchedulerEnabled()) {
            this.flowCatalog.addListener(this.scheduler);
        }
    }

    private void ensureInjected() {
        if (this.resourceHandler == null) {
            throw new IllegalStateException("GobblinServiceManager should be constructed through Guice dependency injection or through a static factory method");
        }
    }

    public void start() throws ApplicationException {
        LOGGER.info("[Init] Starting the Gobblin Service Manager");
        ensureInjected();
        configureServices();
        if (this.helixManager.isPresent()) {
            connectHelixManager();
        }
        this.eventBus.register(this);
        this.serviceLauncher.start();
        if (this.helixManager.isPresent()) {
            ((HelixManager) this.helixManager.get()).addControllerListener(new ControllerChangeListener() { // from class: org.apache.gobblin.service.modules.core.GobblinServiceManager.1
                public void onControllerChange(NotificationContext notificationContext) {
                    GobblinServiceManager.this.handleLeadershipChange(notificationContext);
                }
            });
            if (((HelixManager) this.helixManager.get()).isLeader()) {
                if (this.configuration.isSchedulerEnabled()) {
                    LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler.");
                    this.scheduler.setActive(true);
                }
                if (this.configuration.isGitConfigMonitorEnabled()) {
                    this.gitConfigMonitor.setActive(true);
                }
                if (this.helixLeaderGauges.isPresent()) {
                    ((HelixLeaderState) this.helixLeaderGauges.get()).setState(LeaderState.MASTER);
                }
            } else {
                if (this.configuration.isSchedulerEnabled()) {
                    LOGGER.info("[Init] Gobblin Service is running in slave instance mode, not enabling Scheduler.");
                }
                if (this.helixLeaderGauges.isPresent()) {
                    ((HelixLeaderState) this.helixLeaderGauges.get()).setState(LeaderState.SLAVE);
                }
            }
        } else {
            LOGGER.info("[Init] Gobblin Service is running in single instance mode, enabling Scheduler.");
            this.scheduler.setActive(true);
            if (this.configuration.isGitConfigMonitorEnabled()) {
                this.gitConfigMonitor.setActive(true);
            }
        }
        if (this.configuration.isTopologySpecFactoryEnabled()) {
            Iterator<TopologySpec> it = this.topologySpecFactory.getTopologies().iterator();
            while (it.hasNext()) {
                this.topologyCatalog.put(it.next());
            }
        }
        if (this.configuration.isSchedulerEnabled()) {
            this.topologyCatalog.addListener(this.orchestrator);
        }
        this.topologyCatalog.getInitComplete().countDown();
        this.orchestrator.getSpecCompiler().setActive(true);
        if ((!this.helixManager.isPresent() || ((HelixManager) this.helixManager.get()).isLeader()) && this.configuration.isDagManagerEnabled()) {
            this.dagManager.setActive(true);
            this.eventBus.register(this.dagManager);
        }
    }

    public void stop() throws ApplicationException {
        if (this.stopInProgress) {
            return;
        }
        LOGGER.info("Stopping the Gobblin Service Manager");
        this.stopInProgress = true;
        try {
            this.serviceLauncher.stop();
        } catch (ApplicationException e) {
            LOGGER.error("Error while stopping Gobblin Service Manager", e);
        } finally {
            disconnectHelixManager();
        }
    }

    @VisibleForTesting
    void connectHelixManager() {
        try {
            if (this.helixManager.isPresent()) {
                ((HelixManager) this.helixManager.get()).connect();
                ((HelixManager) this.helixManager.get()).getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), new ControllerUserDefinedMessageHandlerFactory(this.flowCatalogLocalCommit, this.scheduler, this.resourceHandler, this.configuration.getServiceName()));
            }
        } catch (Exception e) {
            LOGGER.error("HelixManager failed to connect", e);
            throw Throwables.propagate(e);
        }
    }

    @VisibleForTesting
    void disconnectHelixManager() {
        if (isHelixManagerConnected() && this.helixManager.isPresent()) {
            ((HelixManager) this.helixManager.get()).disconnect();
        }
    }

    @VisibleForTesting
    boolean isHelixManagerConnected() {
        return this.helixManager.isPresent() && ((HelixManager) this.helixManager.get()).isConnected();
    }

    public void close() throws IOException {
        this.serviceLauncher.close();
    }

    public Collection<StandardMetricsBridge.StandardMetrics> getStandardMetricsCollection() {
        return ImmutableList.of(this.metrics);
    }

    private static String getServiceId(CommandLine commandLine) {
        return commandLine.getOptionValue(SERVICE_ID_OPTION_NAME) == null ? "1" : commandLine.getOptionValue(SERVICE_ID_OPTION_NAME);
    }

    private static Options buildOptions() {
        Options options = new Options();
        options.addOption("a", SERVICE_NAME_OPTION_NAME, true, "Gobblin Service application's name");
        options.addOption("i", SERVICE_ID_OPTION_NAME, true, "Gobblin Service application's ID, this needs to be globally unique");
        return options;
    }

    private static void printUsage(Options options) {
        new HelpFormatter().printHelp(GobblinServiceManager.class.getSimpleName(), options);
    }

    /* JADX WARN: Finally extract failed */
    public static void main(String[] strArr) throws Exception {
        Options buildOptions = buildOptions();
        try {
            CommandLine parse = new DefaultParser().parse(buildOptions, strArr);
            if (!parse.hasOption(SERVICE_NAME_OPTION_NAME)) {
                printUsage(buildOptions);
                System.exit(1);
            }
            if (!parse.hasOption(SERVICE_ID_OPTION_NAME)) {
                printUsage(buildOptions);
                LOGGER.warn("Please assign globally unique ID for a GobblinServiceManager instance, or it will use default ID");
            }
            boolean z = false;
            if (parse.hasOption("test_mode")) {
                z = Boolean.parseBoolean(parse.getOptionValue("test_mode", "false"));
            }
            GobblinServiceManager gobblinServiceManager = (GobblinServiceManager) Guice.createInjector(new Module[]{new GobblinServiceGuiceModule(new GobblinServiceConfiguration(parse.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(parse), ConfigFactory.load(), null))}).getInstance(GobblinServiceManager.class);
            Throwable th = null;
            try {
                gobblinServiceManager.start();
                if (z) {
                    testGobblinService(gobblinServiceManager);
                }
                if (gobblinServiceManager != null) {
                    if (0 != 0) {
                        try {
                            gobblinServiceManager.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        gobblinServiceManager.close();
                    }
                }
            } catch (Throwable th3) {
                if (gobblinServiceManager != null) {
                    if (0 != 0) {
                        try {
                            gobblinServiceManager.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        gobblinServiceManager.close();
                    }
                }
                throw th3;
            }
        } catch (ParseException e) {
            printUsage(buildOptions);
            System.exit(1);
        }
    }

    private static void testGobblinService(GobblinServiceManager gobblinServiceManager) {
        FlowConfigClient flowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/", Integer.valueOf(gobblinServiceManager.restliServer.getPort())));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("param1", "value1");
        try {
            flowConfigClient.createFlowConfig(new FlowConfig().setId(new FlowId().setFlowGroup("testGroup1").setFlowName("testFlow1")).setTemplateUris("FS:///templates/test.template").setSchedule(new Schedule().setCronSchedule("0 1/0 * ? * *").setRunImmediately(true)).setProperties(new StringMap(newHashMap)));
        } catch (RemoteInvocationException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public GobblinServiceConfiguration getConfiguration() {
        return this.configuration;
    }

    public FlowCatalog getFlowCatalog() {
        return this.flowCatalog;
    }

    public GobblinServiceJobScheduler getScheduler() {
        return this.scheduler;
    }

    public FlowConfigsResourceHandler getResourceHandler() {
        return this.resourceHandler;
    }

    public FlowConfigsV2ResourceHandler getV2ResourceHandler() {
        return this.v2ResourceHandler;
    }

    public FlowExecutionResourceHandler getFlowExecutionResourceHandler() {
        return this.flowExecutionResourceHandler;
    }

    public FlowStatusGenerator getFlowStatusGenerator() {
        return this.flowStatusGenerator;
    }

    public GroupOwnershipService getGroupOwnershipService() {
        return this.groupOwnershipService;
    }

    public Injector getInjector() {
        return this.injector;
    }

    public Orchestrator getOrchestrator() {
        return this.orchestrator;
    }

    public DagManager getDagManager() {
        return this.dagManager;
    }
}
