package org.apache.inlong.sdk.commons.admin;

import com.google.common.collect.UnmodifiableIterator;
import com.google.common.eventbus.Subscribe;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.flume.node.MaterializedConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/commons/admin/AdminTask.class */
public class AdminTask {
    public static final Logger LOG = LoggerFactory.getLogger(AdminTask.class);
    public static final String KEY_HOST = "adminTask.host";
    public static final String KEY_PORT = "adminTask.port";
    public static final String KEY_HANDLER = "adminTask.handler";
    public static final String FLUME_ROOT = "admin";
    private Context context;
    private MaterializedConfiguration materializedConfiguration;
    private final ReentrantLock lifecycleLock = new ReentrantLock();
    private final LifecycleSupervisor supervisor = new LifecycleSupervisor();

    public AdminTask(Context context) {
        this.context = context;
    }

    public void start() {
        try {
            Map<String, String> generateFlumeConfiguration = generateFlumeConfiguration();
            if (generateFlumeConfiguration == null) {
                return;
            }
            LOG.info("Start admin task,flumeConf:{}", generateFlumeConfiguration);
            handleConfigurationEvent(new PropertiesConfigurationProvider(FLUME_ROOT, generateFlumeConfiguration).getConfiguration());
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private Map<String, String> generateFlumeConfiguration() throws IOException {
        String string = this.context.getString(KEY_HOST);
        if (string == null) {
            LOG.error("Can not start admin task, host is null.");
            return null;
        }
        String string2 = this.context.getString(KEY_PORT);
        if (string2 == null) {
            LOG.error("Can not start admin task, port is null.");
            return null;
        }
        String string3 = this.context.getString(KEY_HANDLER);
        if (string3 == null) {
            LOG.error("Can not start admin task, handlerType is null.");
            return null;
        }
        String format = String.format("admin.sources=r1\nadmin.sinks=k1\nadmin.channels=c1\nadmin.sources.r1.type=" + AdminHttpSource.class.getName() + "\nadmin.sources.r1.bind=%s\nadmin.sources.r1.port=%s\nadmin.sources.r1.channels=c1\nadmin.sources.r1.handler=%s\nadmin.sinks.k1.type=logger\nadmin.sinks.k1.channel=c1\nadmin.channels.c1.type=memory\nadmin.channels.c1.capacity=1000\nadmin.channels.c1.transactionCapacity=100", string, string2, string3);
        Properties properties = new Properties();
        properties.load(new StringReader(format));
        HashMap hashMap = new HashMap();
        properties.forEach((obj, obj2) -> {
            hashMap.put(String.valueOf(obj), String.valueOf(obj2));
        });
        this.context.getSubProperties("adminTask.handler.").forEach((str, str2) -> {
            hashMap.put("admin.sources.r1.handler." + str, str2);
        });
        return hashMap;
    }

    @Subscribe
    public void handleConfigurationEvent(MaterializedConfiguration materializedConfiguration) {
        try {
            try {
                this.lifecycleLock.lockInterruptibly();
                stopAllComponents();
                startAllComponents(materializedConfiguration);
                if (this.lifecycleLock.isHeldByCurrentThread()) {
                    this.lifecycleLock.unlock();
                }
            } catch (InterruptedException e) {
                LOG.info("Interrupted while trying to handle configuration event");
                if (this.lifecycleLock.isHeldByCurrentThread()) {
                    this.lifecycleLock.unlock();
                }
            }
        } catch (Throwable th) {
            if (this.lifecycleLock.isHeldByCurrentThread()) {
                this.lifecycleLock.unlock();
            }
            throw th;
        }
    }

    public void stop() {
        this.lifecycleLock.lock();
        stopAllComponents();
        try {
            this.supervisor.stop();
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    private void stopAllComponents() {
        if (this.materializedConfiguration != null) {
            LOG.info("Shutting down configuration: {}", this.materializedConfiguration);
            UnmodifiableIterator it = this.materializedConfiguration.getSourceRunners().entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                try {
                    LOG.info("Stopping Source " + ((String) entry.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry.getValue());
                } catch (Exception e) {
                    LOG.error("Error while stopping {}", entry.getValue(), e);
                }
            }
            UnmodifiableIterator it2 = this.materializedConfiguration.getSinkRunners().entrySet().iterator();
            while (it2.hasNext()) {
                Map.Entry entry2 = (Map.Entry) it2.next();
                try {
                    LOG.info("Stopping Sink " + ((String) entry2.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry2.getValue());
                } catch (Exception e2) {
                    LOG.error("Error while stopping {}", entry2.getValue(), e2);
                }
            }
            UnmodifiableIterator it3 = this.materializedConfiguration.getChannels().entrySet().iterator();
            while (it3.hasNext()) {
                Map.Entry entry3 = (Map.Entry) it3.next();
                try {
                    LOG.info("Stopping Channel " + ((String) entry3.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry3.getValue());
                } catch (Exception e3) {
                    LOG.error("Error while stopping {}", entry3.getValue(), e3);
                }
            }
        }
    }

    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
        LOG.info("Starting new configuration:{}", materializedConfiguration);
        this.materializedConfiguration = materializedConfiguration;
        UnmodifiableIterator it = materializedConfiguration.getChannels().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            try {
                LOG.info("Starting Channel " + ((String) entry.getKey()));
                this.supervisor.supervise((LifecycleAware) entry.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e) {
                LOG.error("Error while starting {}", entry.getValue(), e);
            }
        }
        UnmodifiableIterator it2 = materializedConfiguration.getChannels().values().iterator();
        while (it2.hasNext()) {
            Channel channel = (Channel) it2.next();
            while (channel.getLifecycleState() != LifecycleState.START && !this.supervisor.isComponentInErrorState(channel)) {
                try {
                    LOG.info("Waiting for channel: " + channel.getName() + " to start. Sleeping for 500 ms");
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    LOG.error("Interrupted while waiting for channel to start.", e2);
                }
            }
        }
        UnmodifiableIterator it3 = materializedConfiguration.getSinkRunners().entrySet().iterator();
        while (it3.hasNext()) {
            Map.Entry entry2 = (Map.Entry) it3.next();
            try {
                LOG.info("Starting Sink " + ((String) entry2.getKey()));
                this.supervisor.supervise((LifecycleAware) entry2.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e3) {
                LOG.error("Error while starting {}", entry2.getValue(), e3);
            }
        }
        UnmodifiableIterator it4 = materializedConfiguration.getSourceRunners().entrySet().iterator();
        while (it4.hasNext()) {
            Map.Entry entry3 = (Map.Entry) it4.next();
            try {
                LOG.info("Starting Source " + ((String) entry3.getKey()));
                this.supervisor.supervise((LifecycleAware) entry3.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e4) {
                LOG.error("Error while starting {}", entry3.getValue(), e4);
            }
        }
    }

    public static void main(String[] strArr) {
        try {
            Context context = new Context();
            context.put(KEY_HOST, "127.0.0.1");
            context.put(KEY_PORT, "8080");
            context.put(KEY_HANDLER, "org.apache.inlong.dataproxy.admin.AdminJsonHandler");
            context.put("adminTask.handler.stopService.type", "org.apache.inlong.dataproxy.admin.ProxyServiceAdminEventHandler");
            new AdminTask(context).start();
            Thread.sleep(10000L);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
