package org.apache.inlong.audit.node;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.MonitoringType;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.flume.node.MaterializedConfiguration;
import org.apache.flume.node.PollingPropertiesFileConfigurationProvider;
import org.apache.flume.node.PropertiesFileConfigurationProvider;
import org.apache.flume.util.SSLUtil;
import org.apache.inlong.audit.file.ConfigManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/audit/node/Application.class */
public class Application {
    private static final Logger logger = LoggerFactory.getLogger(Application.class);
    public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
    public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
    private final List<LifecycleAware> components;
    private final LifecycleSupervisor supervisor;
    private MaterializedConfiguration materializedConfiguration;
    private MonitorService monitorServer;
    private final ReentrantLock lifecycleLock;

    public Application() {
        this(new ArrayList(0));
    }

    public Application(List<LifecycleAware> list) {
        this.lifecycleLock = new ReentrantLock();
        this.components = list;
        this.supervisor = new LifecycleSupervisor();
    }

    public void start() {
        this.lifecycleLock.lock();
        try {
            Iterator<LifecycleAware> it = this.components.iterator();
            while (it.hasNext()) {
                this.supervisor.supervise(it.next(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            }
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Subscribe
    public void handleConfigurationEvent(MaterializedConfiguration materializedConfiguration) {
        try {
            try {
                this.lifecycleLock.lockInterruptibly();
                stopAllComponents();
                startAllComponents(materializedConfiguration);
                if (this.lifecycleLock.isHeldByCurrentThread()) {
                    this.lifecycleLock.unlock();
                }
            } catch (InterruptedException e) {
                logger.info("Interrupted while trying to handle configuration event");
                if (this.lifecycleLock.isHeldByCurrentThread()) {
                    this.lifecycleLock.unlock();
                }
            }
        } catch (Throwable th) {
            if (this.lifecycleLock.isHeldByCurrentThread()) {
                this.lifecycleLock.unlock();
            }
            throw th;
        }
    }

    public void stop() {
        this.lifecycleLock.lock();
        stopAllComponents();
        try {
            this.supervisor.stop();
            if (this.monitorServer != null) {
                this.monitorServer.stop();
            }
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    private void stopAllComponents() {
        if (this.materializedConfiguration != null) {
            logger.info("Shutting down configuration: {}", this.materializedConfiguration);
            for (Map.Entry entry : this.materializedConfiguration.getSourceRunners().entrySet()) {
                try {
                    logger.info("Stopping Source " + ((String) entry.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry.getValue());
                } catch (Exception e) {
                    logger.error("Error while stopping {}", entry.getValue(), e);
                }
            }
            for (Map.Entry entry2 : this.materializedConfiguration.getSinkRunners().entrySet()) {
                try {
                    logger.info("Stopping Sink " + ((String) entry2.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry2.getValue());
                } catch (Exception e2) {
                    logger.error("Error while stopping {}", entry2.getValue(), e2);
                }
            }
            for (Map.Entry entry3 : this.materializedConfiguration.getChannels().entrySet()) {
                try {
                    logger.info("Stopping Channel " + ((String) entry3.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry3.getValue());
                } catch (Exception e3) {
                    logger.error("Error while stopping {}", entry3.getValue(), e3);
                }
            }
        }
        if (this.monitorServer != null) {
            this.monitorServer.stop();
        }
    }

    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
        logger.info("Starting new configuration:{}", materializedConfiguration);
        this.materializedConfiguration = materializedConfiguration;
        for (Map.Entry entry : materializedConfiguration.getChannels().entrySet()) {
            try {
                logger.info("Starting Channel " + ((String) entry.getKey()));
                this.supervisor.supervise((LifecycleAware) entry.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e) {
                logger.error("Error while starting {}", entry.getValue(), e);
            }
        }
        for (Channel channel : materializedConfiguration.getChannels().values()) {
            while (channel.getLifecycleState() != LifecycleState.START && !this.supervisor.isComponentInErrorState(channel)) {
                try {
                    logger.info("Waiting for channel: " + channel.getName() + " to start. Sleeping for 500 ms");
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    logger.error("Interrupted while waiting for channel to start.", e2);
                    Throwables.propagate(e2);
                }
            }
        }
        for (Map.Entry entry2 : materializedConfiguration.getSinkRunners().entrySet()) {
            try {
                logger.info("Starting Sink " + ((String) entry2.getKey()));
                this.supervisor.supervise((LifecycleAware) entry2.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e3) {
                logger.error("Error while starting {}", entry2.getValue(), e3);
            }
        }
        for (Map.Entry entry3 : materializedConfiguration.getSourceRunners().entrySet()) {
            try {
                logger.info("Starting Source " + ((String) entry3.getKey()));
                this.supervisor.supervise((LifecycleAware) entry3.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e4) {
                logger.error("Error while starting {}", entry3.getValue(), e4);
            }
        }
        loadMonitoring();
    }

    private void loadMonitoring() {
        Class<?> cls;
        Properties properties = System.getProperties();
        Set<String> stringPropertyNames = properties.stringPropertyNames();
        try {
            if (stringPropertyNames.contains(CONF_MONITOR_CLASS)) {
                String property = properties.getProperty(CONF_MONITOR_CLASS);
                try {
                    cls = MonitoringType.valueOf(property.toUpperCase(Locale.ENGLISH)).getMonitorClass();
                } catch (Exception e) {
                    cls = Class.forName(property);
                }
                this.monitorServer = (MonitorService) cls.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                Context context = new Context();
                for (String str : stringPropertyNames) {
                    if (str.startsWith(CONF_MONITOR_PREFIX)) {
                        context.put(str.substring(CONF_MONITOR_PREFIX.length()), properties.getProperty(str));
                    }
                }
                this.monitorServer.configure(context);
                this.monitorServer.start();
            }
        } catch (Exception e2) {
            logger.warn("Error starting monitoring. Monitoring might not be available.", e2);
        }
    }

    public static void main(String[] strArr) {
        Application application;
        try {
            SSLUtil.initGlobalSSLParameters();
            Options options = new Options();
            Option option = new Option("n", "name", true, "the name of this agent");
            option.setRequired(true);
            options.addOption(option);
            Option option2 = new Option("f", "conf-file", true, "specify a config file (required if -z missing)");
            option2.setRequired(false);
            options.addOption(option2);
            options.addOption(new Option((String) null, "no-reload-conf", false, "do not reload config file if changed"));
            options.addOption(new Option("h", "help", false, "display help text"));
            CommandLine parse = new GnuParser().parse(options, strArr);
            if (parse.hasOption('h')) {
                new HelpFormatter().printHelp("flume-ng agent", options, true);
                return;
            }
            String optionValue = parse.getOptionValue('n');
            boolean z = !parse.hasOption("no-reload-conf");
            ConfigManager.getInstance();
            File file = new File(parse.getOptionValue('f'));
            if (!file.exists() && System.getProperty("flume.called.from.service") == null) {
                String path = file.getPath();
                try {
                    path = file.getCanonicalPath();
                } catch (IOException e) {
                    logger.error("Failed to read canonical path for file: " + path, e);
                }
                throw new ParseException("The specified configuration file does not exist: " + path);
            }
            ArrayList newArrayList = Lists.newArrayList();
            if (z) {
                EventBus eventBus = new EventBus(optionValue + "-event-bus");
                newArrayList.add(new PollingPropertiesFileConfigurationProvider(optionValue, file, eventBus, 30));
                application = new Application(newArrayList);
                eventBus.register(application);
            } else {
                PropertiesFileConfigurationProvider propertiesFileConfigurationProvider = new PropertiesFileConfigurationProvider(optionValue, file);
                application = new Application();
                application.handleConfigurationEvent(propertiesFileConfigurationProvider.getConfiguration());
            }
            application.start();
            final Application application2 = application;
            Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { // from class: org.apache.inlong.audit.node.Application.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    application2.stop();
                }
            });
        } catch (Exception e2) {
            logger.error("A fatal error occurred while running. Exception follows.", e2);
        }
    }
}
