package org.apache.inlong.agent.core.trigger;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.db.TriggerProfileDb;
import org.apache.inlong.agent.plugin.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/core/trigger/TriggerManager.class */
public class TriggerManager extends AbstractDaemon {
    public static final int JOB_CHECK_INTERVAL = 1;
    private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
    private final AgentManager manager;
    private final TriggerProfileDb triggerProfileDB;
    private final AgentConfiguration conf = AgentConfiguration.getAgentConf();
    private final ConcurrentHashMap<String, Trigger> triggerMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, JobProfile>> triggerJobMap = new ConcurrentHashMap<>();
    private final int triggerFetchInterval = this.conf.getInt("trigger.fetch.interval", 1);
    private final int maxRunningNum = this.conf.getInt("trigger.max.running.num", 4096);

    public TriggerManager(AgentManager agentManager, TriggerProfileDb triggerProfileDb) {
        this.manager = agentManager;
        this.triggerProfileDB = triggerProfileDb;
    }

    public boolean addTrigger(TriggerProfile triggerProfile) {
        try {
            Trigger trigger = (Trigger) Class.forName(triggerProfile.get("job.fileJob.trigger")).newInstance();
            String str = triggerProfile.get("job.id");
            if (this.triggerMap.containsKey(str)) {
                deleteTrigger(str);
                LOGGER.warn("trigger {} is running, stop it", str);
            }
            this.triggerMap.put(str, trigger);
            trigger.init(triggerProfile);
            trigger.run();
            return true;
        } catch (Exception e) {
            LOGGER.error("exception caught", e);
            return false;
        }
    }

    public Trigger getTrigger(String str) {
        return this.triggerMap.get(str);
    }

    public boolean submitTrigger(TriggerProfile triggerProfile) {
        if (!triggerProfile.allRequiredKeyExist() || this.triggerMap.size() > this.maxRunningNum) {
            LOGGER.error("trigger {} not all required key exists or size {} exceed {}", new Object[]{triggerProfile.toJsonStr(), Integer.valueOf(this.triggerMap.size()), Integer.valueOf(this.maxRunningNum)});
            return false;
        }
        preprocessTrigger(triggerProfile);
        this.triggerProfileDB.storeTrigger(triggerProfile);
        addTrigger(triggerProfile);
        return true;
    }

    public void preprocessTrigger(TriggerProfile triggerProfile) {
        if ("FULL".equals(triggerProfile.get("job.fileJob.collectType", ""))) {
            LOGGER.info("Initialize submit full path. trigger {} ", triggerProfile.getTriggerId());
            this.manager.getJobManager().submitFileJobProfile(triggerProfile);
        }
    }

    private Runnable jobFetchThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    this.triggerMap.forEach((str, trigger) -> {
                        JobProfile fetchJobProfile = trigger.fetchJobProfile();
                        if (fetchJobProfile != null) {
                            TriggerProfile triggerProfile = trigger.getTriggerProfile();
                            if (triggerProfile.getBoolean("job.standalone", false)) {
                                deleteRelatedJobs(triggerProfile.getTriggerId());
                            }
                            this.manager.getJobManager().submitFileJobProfile(fetchJobProfile);
                            addToTriggerMap(fetchJobProfile.get("job.id"), fetchJobProfile);
                        }
                    });
                    TimeUnit.SECONDS.sleep(this.triggerFetchInterval);
                } catch (Exception e) {
                    LOGGER.info("ignored Exception ", e);
                }
            }
        };
    }

    private void deleteRelatedJobs(String str) {
        LOGGER.info("start to delete related jobs in triggerId {}", str);
        ConcurrentHashMap<String, JobProfile> concurrentHashMap = this.triggerJobMap.get(str);
        if (concurrentHashMap != null) {
            LOGGER.info("trigger can only run one job, stop the others {}", concurrentHashMap.keySet());
            concurrentHashMap.keySet().forEach(this::deleteJob);
            this.triggerJobMap.remove(str);
        }
    }

    private void deleteJob(String str) {
        this.manager.getJobManager().deleteJob(str);
    }

    private Runnable jobCheckThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    this.triggerJobMap.forEach((str, concurrentHashMap) -> {
                        Iterator it = concurrentHashMap.keySet().iterator();
                        while (it.hasNext()) {
                            String str = (String) it.next();
                            if (this.manager.getJobManager().getJobs().get(str) == null) {
                                this.triggerJobMap.remove(str);
                            }
                        }
                    });
                    TimeUnit.MINUTES.sleep(1L);
                } catch (Exception e) {
                    LOGGER.info("ignored Exception ", e);
                }
            }
        };
    }

    private void addToTriggerMap(String str, JobProfile jobProfile) {
        ConcurrentHashMap<String, JobProfile> concurrentHashMap = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, JobProfile> putIfAbsent = this.triggerJobMap.putIfAbsent(str, concurrentHashMap);
        if (putIfAbsent == null) {
            putIfAbsent = concurrentHashMap;
        }
        putIfAbsent.putIfAbsent(jobProfile.getInstanceId(), jobProfile);
    }

    public boolean deleteTrigger(String str) {
        LOGGER.info("delete trigger {}", str);
        Trigger remove = this.triggerMap.remove(str);
        if (remove == null) {
            LOGGER.warn("cannot find trigger {}", str);
            return false;
        }
        deleteRelatedJobs(str);
        remove.destroy();
        this.triggerProfileDB.deleteTrigger(str);
        return true;
    }

    private void initTriggers() throws Exception {
        Iterator it = this.triggerProfileDB.getTriggers().iterator();
        while (it.hasNext()) {
            addTrigger((TriggerProfile) it.next());
        }
    }

    private void stopTriggers() {
        this.triggerMap.forEach((str, trigger) -> {
            trigger.destroy();
        });
    }

    public void start() throws Exception {
        initTriggers();
        submitWorker(jobFetchThread());
        submitWorker(jobCheckThread());
    }

    public void stop() {
        stopTriggers();
    }
}
