package org.apache.inlong.sort.standalone;

import com.google.common.collect.UnmodifiableIterator;
import com.google.common.eventbus.Subscribe;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flume.Channel;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.lifecycle.LifecycleSupervisor;
import org.apache.flume.node.MaterializedConfiguration;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/SortTask.class */
public class SortTask {
    public static final Logger LOG = InlongLoggerFactory.getLogger(SortTask.class);
    private final String taskName;
    private MaterializedConfiguration materializedConfiguration;
    private final ReentrantLock lifecycleLock = new ReentrantLock();
    private final LifecycleSupervisor supervisor = new LifecycleSupervisor();

    public SortTask(String str) {
        this.taskName = str;
    }

    public String getTaskName() {
        return this.taskName;
    }

    public void start() {
        SortTaskConfig taskConfig = SortClusterConfigHolder.getTaskConfig(this.taskName);
        if (taskConfig == null) {
            return;
        }
        Map generateFlumeConfiguration = taskConfig.generateFlumeConfiguration();
        LOG.info("Start sort task:{},config:{}", this.taskName, generateFlumeConfiguration);
        handleConfigurationEvent(new PropertiesConfigurationProvider(taskConfig.getName(), generateFlumeConfiguration).getConfiguration());
    }

    @Subscribe
    public void handleConfigurationEvent(MaterializedConfiguration materializedConfiguration) {
        try {
            try {
                this.lifecycleLock.lockInterruptibly();
                stopAllComponents();
                startAllComponents(materializedConfiguration);
                if (this.lifecycleLock.isHeldByCurrentThread()) {
                    this.lifecycleLock.unlock();
                }
            } catch (InterruptedException e) {
                LOG.info("Interrupted while trying to handle configuration event");
                if (this.lifecycleLock.isHeldByCurrentThread()) {
                    this.lifecycleLock.unlock();
                }
            }
        } catch (Throwable th) {
            if (this.lifecycleLock.isHeldByCurrentThread()) {
                this.lifecycleLock.unlock();
            }
            throw th;
        }
    }

    public void stop() {
        this.lifecycleLock.lock();
        stopAllComponents();
        try {
            this.supervisor.stop();
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    private void stopAllComponents() {
        if (this.materializedConfiguration != null) {
            LOG.info("Shutting down configuration: {}", this.materializedConfiguration);
            UnmodifiableIterator it = this.materializedConfiguration.getSourceRunners().entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                try {
                    LOG.info("Stopping Source " + ((String) entry.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry.getValue());
                } catch (Exception e) {
                    LOG.error("Error while stopping {}", entry.getValue(), e);
                }
            }
            UnmodifiableIterator it2 = this.materializedConfiguration.getSinkRunners().entrySet().iterator();
            while (it2.hasNext()) {
                Map.Entry entry2 = (Map.Entry) it2.next();
                try {
                    LOG.info("Stopping Sink " + ((String) entry2.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry2.getValue());
                } catch (Exception e2) {
                    LOG.error("Error while stopping {}", entry2.getValue(), e2);
                }
            }
            UnmodifiableIterator it3 = this.materializedConfiguration.getChannels().entrySet().iterator();
            while (it3.hasNext()) {
                Map.Entry entry3 = (Map.Entry) it3.next();
                try {
                    LOG.info("Stopping Channel " + ((String) entry3.getKey()));
                    this.supervisor.unsupervise((LifecycleAware) entry3.getValue());
                } catch (Exception e3) {
                    LOG.error("Error while stopping {}", entry3.getValue(), e3);
                }
            }
        }
    }

    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
        LOG.info("Starting new configuration:{}", materializedConfiguration);
        this.materializedConfiguration = materializedConfiguration;
        UnmodifiableIterator it = materializedConfiguration.getChannels().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            try {
                LOG.info("Starting Channel " + ((String) entry.getKey()));
                this.supervisor.supervise((LifecycleAware) entry.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e) {
                LOG.error("Error while starting {}", entry.getValue(), e);
            }
        }
        UnmodifiableIterator it2 = materializedConfiguration.getChannels().values().iterator();
        while (it2.hasNext()) {
            Channel channel = (Channel) it2.next();
            while (channel.getLifecycleState() != LifecycleState.START && !this.supervisor.isComponentInErrorState(channel)) {
                try {
                    LOG.info("Waiting for channel: " + channel.getName() + " to start. Sleeping for 500 ms");
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    LOG.error("Interrupted while waiting for channel to start.", e2);
                }
            }
        }
        UnmodifiableIterator it3 = materializedConfiguration.getSinkRunners().entrySet().iterator();
        while (it3.hasNext()) {
            Map.Entry entry2 = (Map.Entry) it3.next();
            try {
                LOG.info("Starting Sink " + ((String) entry2.getKey()));
                this.supervisor.supervise((LifecycleAware) entry2.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e3) {
                LOG.error("Error while starting {}", entry2.getValue(), e3);
            }
        }
        UnmodifiableIterator it4 = materializedConfiguration.getSourceRunners().entrySet().iterator();
        while (it4.hasNext()) {
            Map.Entry entry3 = (Map.Entry) it4.next();
            try {
                LOG.info("Starting Source " + ((String) entry3.getKey()));
                this.supervisor.supervise((LifecycleAware) entry3.getValue(), new LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e4) {
                LOG.error("Error while starting {}", entry3.getValue(), e4);
            }
        }
    }
}
