package org.apache.gobblin.cluster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinHelixMultiManager;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.app.ApplicationException;
import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/cluster/GobblinClusterManager.class */
public class GobblinClusterManager implements ApplicationLauncher, StandardMetricsBridge, LeadershipChangeAwareComponent {
    private static final Logger log = LoggerFactory.getLogger(GobblinClusterManager.class);
    private static final Logger LOGGER = LoggerFactory.getLogger(GobblinClusterManager.class);
    protected ServiceBasedAppLauncher applicationLauncher;
    protected final Path appWorkDir;
    protected final FileSystem fs;
    protected final String applicationId;
    private Thread idleProcessThread;
    private final boolean isStandaloneMode;
    protected GobblinHelixMultiManager multiManager;
    private MutableJobCatalog jobCatalog;
    private GobblinHelixJobScheduler jobScheduler;
    private JobConfigurationManager jobConfigurationManager;
    protected final String clusterName;
    protected final Config config;
    private StopStatus stopStatus = new StopStatus(false);
    protected final EventBus eventBus = new EventBus(GobblinClusterManager.class.getSimpleName());
    private volatile boolean stopIdleProcessThread = false;
    private volatile boolean started = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/gobblin/cluster/GobblinClusterManager$StopStatus.class */
    public static class StopStatus {
        AtomicBoolean isStopInProgress;

        public StopStatus(boolean z) {
            this.isStopInProgress = new AtomicBoolean(z);
        }

        public void setStopInprogress(boolean z) {
            this.isStopInProgress.set(z);
        }

        public boolean isStopInProgress() {
            return this.isStopInProgress.get();
        }

        public AtomicBoolean getIsStopInProgress() {
            return this.isStopInProgress;
        }

        public void setIsStopInProgress(AtomicBoolean atomicBoolean) {
            this.isStopInProgress = atomicBoolean;
        }
    }

    public GobblinClusterManager(String str, String str2, Config config, Optional<Path> optional) throws Exception {
        GobblinClusterUtils.setSystemProperties(config);
        this.config = GobblinClusterUtils.addDynamicConfig(config);
        this.clusterName = str;
        this.isStandaloneMode = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY, false);
        this.applicationId = str2;
        initializeHelixManager();
        this.fs = GobblinClusterUtils.buildFileSystem(this.config, new Configuration());
        this.appWorkDir = optional.isPresent() ? (Path) optional.get() : GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, str, str2);
        LOGGER.info("Configured GobblinClusterManager work dir to: {}", this.appWorkDir);
        initializeAppLauncherAndServices();
    }

    private void initializeAppLauncherAndServices() throws Exception {
        Properties configToProperties = ConfigUtils.configToProperties(this.config);
        if (!configToProperties.contains("app.stop.time.seconds")) {
            configToProperties.setProperty("app.stop.time.seconds", Long.toString(300L));
        }
        this.applicationLauncher = new ServiceBasedAppLauncher(configToProperties, this.clusterName);
        if (this.config.hasPath("gobblin.cluster.jobconf.fullyQualifiedPath")) {
            this.jobCatalog = (MutableJobCatalog) GobblinConstructorUtils.invokeFirstConstructor(Class.forName(ConfigUtils.getString(this.config, GobblinClusterConfigurationKeys.JOB_CATALOG_KEY, GobblinClusterConfigurationKeys.DEFAULT_JOB_CATALOG)), new List[]{ImmutableList.of(this.config.getConfig(StringUtils.removeEnd(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX, ".")).withFallback(this.config))});
        } else {
            this.jobCatalog = null;
        }
        SchedulerService schedulerService = new SchedulerService(configToProperties);
        this.applicationLauncher.addService(schedulerService);
        this.jobScheduler = buildGobblinHelixJobScheduler(this.config, this.appWorkDir, getMetadataTags(this.clusterName, this.applicationId), schedulerService);
        this.applicationLauncher.addService(this.jobScheduler);
        this.jobConfigurationManager = buildJobConfigurationManager(this.config);
        this.applicationLauncher.addService(this.jobConfigurationManager);
        if (ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED, false)) {
            this.applicationLauncher.addService(new ContainerHealthMetricsService(this.config));
        }
    }

    private void startAppLauncherAndServices() {
        if (this.jobCatalog instanceof Service) {
            this.jobCatalog.startAsync().awaitRunning();
        }
        this.applicationLauncher.start();
    }

    private void stopAppLauncherAndServices() {
        try {
            this.applicationLauncher.stop();
        } catch (ApplicationException e) {
            LOGGER.error("Error while stopping Gobblin Cluster application launcher", e);
        }
        if (this.jobCatalog instanceof Service) {
            this.jobCatalog.stopAsync().awaitTerminated();
        }
    }

    @VisibleForTesting
    void configureHelixQuotaBasedTaskScheduling() {
        List stringList = ConfigUtils.getStringList(this.config, GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY);
        if (stringList.isEmpty()) {
            return;
        }
        ClusterConfig clusterConfig = this.multiManager.getJobClusterHelixManager().getConfigAccessor().getClusterConfig(this.clusterName);
        clusterConfig.resetTaskQuotaRatioMap();
        Iterator it = stringList.iterator();
        while (it.hasNext()) {
            List splitToList = Splitter.on(":").limit(2).trimResults().omitEmptyStrings().splitToList((String) it.next());
            if (splitToList.size() < 2) {
                throw new IllegalArgumentException("Quota configurations must be of the form <key1>:<value1>,<key2>:<value2>,...");
            }
            clusterConfig.setTaskQuotaRatio((String) splitToList.get(0), Integer.parseInt((String) splitToList.get(1)));
        }
        this.multiManager.getJobClusterHelixManager().getConfigAccessor().setClusterConfig(this.clusterName, clusterConfig);
    }

    public synchronized void start() {
        LOGGER.info("Starting the Gobblin Cluster Manager");
        this.eventBus.register(this);
        this.multiManager.connect();
        if (!this.isStandaloneMode) {
            this.multiManager.cleanUpJobs();
        }
        configureHelixQuotaBasedTaskScheduling();
        if (this.isStandaloneMode) {
            this.idleProcessThread = new Thread(new Runnable() { // from class: org.apache.gobblin.cluster.GobblinClusterManager.1
                @Override // java.lang.Runnable
                public void run() {
                    while (!GobblinClusterManager.this.stopStatus.isStopInProgress() && !GobblinClusterManager.this.stopIdleProcessThread) {
                        try {
                            Thread.sleep(300L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }
            });
            this.idleProcessThread.start();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                this.stopIdleProcessThread = true;
            }));
        } else {
            startAppLauncherAndServices();
        }
        this.started = true;
    }

    public synchronized void stop() {
        if (this.stopStatus.isStopInProgress()) {
            return;
        }
        this.stopStatus.setStopInprogress(true);
        LOGGER.info("Stopping the Gobblin Cluster Manager");
        if (this.idleProcessThread != null) {
            try {
                this.idleProcessThread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        if (!this.isStandaloneMode) {
            sendShutdownRequest();
        }
        stopAppLauncherAndServices();
        this.multiManager.disconnect();
    }

    private List<? extends Tag<?>> getMetadataTags(String str, String str2) {
        return Tag.fromMap(new ImmutableMap.Builder().put(GobblinClusterMetricTagNames.APPLICATION_NAME, str).put(GobblinClusterMetricTagNames.APPLICATION_ID, str2).build());
    }

    private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config config, Path path, List<? extends Tag<?>> list, SchedulerService schedulerService) throws Exception {
        return new GobblinHelixJobScheduler(config, this.multiManager.getJobClusterHelixManager(), this.multiManager.getTaskDriverHelixManager(), this.eventBus, path, list, schedulerService, this.jobCatalog);
    }

    private JobConfigurationManager buildJobConfigurationManager(Config config) {
        try {
            ImmutableList of = this.jobCatalog != null ? ImmutableList.of(this.eventBus, config, this.jobCatalog, this.fs) : ImmutableList.of(this.eventBus, config, this.fs);
            return config.hasPath(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY) ? (JobConfigurationManager) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)), of.toArray(new Object[of.size()])) : new JobConfigurationManager(this.eventBus, config);
        } catch (ReflectiveOperationException e) {
            throw new RuntimeException(e);
        }
    }

    @Subscribe
    public void handleApplicationMasterShutdownRequest(ClusterManagerShutdownRequest clusterManagerShutdownRequest) {
        stop();
    }

    protected MultiTypeMessageHandlerFactory getUserDefinedMessageHandlerFactory() {
        return new GobblinHelixMultiManager.ControllerUserDefinedMessageHandlerFactory();
    }

    @VisibleForTesting
    void connectHelixManager() {
        this.multiManager.connect();
    }

    @VisibleForTesting
    void disconnectHelixManager() {
        this.multiManager.disconnect();
    }

    @VisibleForTesting
    boolean isHelixManagerConnected() {
        return this.multiManager.isConnected();
    }

    @VisibleForTesting
    void initializeHelixManager() {
        this.multiManager = new GobblinHelixMultiManager(this.config, r3 -> {
            return getUserDefinedMessageHandlerFactory();
        }, this.eventBus, this.stopStatus);
        this.multiManager.addLeadershipChangeAwareComponent(this);
    }

    @VisibleForTesting
    void sendShutdownRequest() {
        Criteria criteria = new Criteria();
        criteria.setInstanceName("%");
        criteria.setResource("%");
        criteria.setPartition("%");
        criteria.setPartitionState("%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setSessionSpecific(true);
        Message message = new Message(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString().toLowerCase() + UUID.randomUUID().toString());
        message.setMsgSubType(HelixMessageSubTypes.WORK_UNIT_RUNNER_SHUTDOWN.toString());
        message.setMsgState(Message.MessageState.NEW);
        if (new GobblinHelixMessagingService(this.multiManager.getJobClusterHelixManager()).send(criteria, message, new NoopReplyHandler(), 300000) == 0) {
            LOGGER.error(String.format("Failed to send the %s message to the participants", message.getMsgSubType()));
        }
    }

    public void close() throws IOException {
        this.applicationLauncher.close();
    }

    public Collection<StandardMetricsBridge.StandardMetrics> getStandardMetricsCollection() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.jobScheduler.getStandardMetricsCollection());
        arrayList.addAll(this.multiManager.getStandardMetricsCollection());
        arrayList.addAll(this.jobCatalog.getStandardMetricsCollection());
        arrayList.addAll(this.jobConfigurationManager.getStandardMetricsCollection());
        return arrayList;
    }

    private static String getApplicationId() {
        return "1";
    }

    private static Options buildOptions() {
        Options options = new Options();
        options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, "Gobblin application name");
        options.addOption("s", GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE, true, "Standalone cluster mode");
        options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true, "Helix instance name");
        return options;
    }

    private static void printUsage(Options options) {
        new HelpFormatter().printHelp(GobblinClusterManager.class.getSimpleName(), options);
    }

    public static void main(String[] strArr) throws Exception {
        Options buildOptions = buildOptions();
        try {
            CommandLine parse = new DefaultParser().parse(buildOptions, strArr);
            if (!parse.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)) {
                printUsage(buildOptions);
                System.exit(1);
            }
            boolean z = false;
            if (parse.hasOption(GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE)) {
                z = Boolean.parseBoolean(parse.getOptionValue(GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE, "false"));
            }
            LOGGER.info(JvmUtils.getJvmInputArguments());
            Config load = ConfigFactory.load();
            if (parse.hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)) {
                load = load.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(parse.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)));
            }
            if (z) {
                load = load.withValue(GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY, ConfigValueFactory.fromAnyRef(true));
            }
            GobblinClusterManager gobblinClusterManager = new GobblinClusterManager(parse.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME), getApplicationId(), load, Optional.absent());
            Throwable th = null;
            if (z) {
                try {
                    try {
                        String string = load.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
                        String string2 = load.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
                        HelixUtils.createGobblinHelixCluster(string, string2, false);
                        LOGGER.info("Created Helix cluster " + string2);
                    } finally {
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            }
            gobblinClusterManager.start();
            if (gobblinClusterManager != null) {
                if (0 != 0) {
                    try {
                        gobblinClusterManager.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    gobblinClusterManager.close();
                }
            }
        } catch (ParseException e) {
            printUsage(buildOptions);
            System.exit(1);
        }
    }

    @Override // org.apache.gobblin.cluster.LeadershipChangeAwareComponent
    public void becomeActive() {
        startAppLauncherAndServices();
    }

    @Override // org.apache.gobblin.cluster.LeadershipChangeAwareComponent
    public void becomeStandby() {
        stopAppLauncherAndServices();
        try {
            initializeAppLauncherAndServices();
        } catch (Exception e) {
            throw new RuntimeException("Exception reinitializing app launcher services ", e);
        }
    }

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public GobblinHelixMultiManager getMultiManager() {
        return this.multiManager;
    }

    public MutableJobCatalog getJobCatalog() {
        return this.jobCatalog;
    }

    public GobblinHelixJobScheduler getJobScheduler() {
        return this.jobScheduler;
    }

    public JobConfigurationManager getJobConfigurationManager() {
        return this.jobConfigurationManager;
    }

    public boolean isStarted() {
        return this.started;
    }

    public Config getConfig() {
        return this.config;
    }
}
