package org.apache.inlong.agent.core.trigger;

import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.core.job.JobWrapper;
import org.apache.inlong.agent.db.TriggerProfileDb;
import org.apache.inlong.agent.plugin.Trigger;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/core/trigger/TriggerManager.class */
public class TriggerManager extends AbstractDaemon {
    private static final Logger LOGGER = LoggerFactory.getLogger(TriggerManager.class);
    private final AgentManager manager;
    private final TriggerProfileDb triggerProfileDB;
    private final AgentConfiguration conf = AgentConfiguration.getAgentConf();
    private final ConcurrentHashMap<String, Trigger> triggerMap = new ConcurrentHashMap<>();
    private final int triggerFetchInterval = this.conf.getInt("trigger.fetch.interval", 1);
    private final int maxRunningNum = this.conf.getInt("trigger.max.running.num", 4096);

    public TriggerManager(AgentManager agentManager, TriggerProfileDb triggerProfileDb) {
        this.manager = agentManager;
        this.triggerProfileDB = triggerProfileDb;
    }

    public boolean restoreTrigger(TriggerProfile triggerProfile) {
        try {
            Trigger trigger = (Trigger) Class.forName(triggerProfile.get("job.fileJob.trigger")).newInstance();
            String str = triggerProfile.get("job.id");
            if (this.triggerMap.containsKey(str)) {
                deleteTrigger(str);
                LOGGER.warn("trigger {} is running, stop it", str);
            }
            this.triggerMap.put(str, trigger);
            trigger.init(triggerProfile);
            trigger.run();
            return true;
        } catch (Throwable th) {
            LOGGER.error("add trigger error: ", th);
            ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
            return false;
        }
    }

    public Trigger getTrigger(String str) {
        return this.triggerMap.get(str);
    }

    public void submitTrigger(TriggerProfile triggerProfile) {
        if (!triggerProfile.allRequiredKeyExist() || this.triggerMap.size() > this.maxRunningNum) {
            throw new IllegalArgumentException(String.format("trigger %s not all required key exists or size %d exceed %d", triggerProfile.toJsonStr(), Integer.valueOf(this.triggerMap.size()), Integer.valueOf(this.maxRunningNum)));
        }
        if (this.triggerProfileDB.getTriggers().stream().anyMatch(triggerProfile2 -> {
            return triggerProfile2.getTriggerId().equals(triggerProfile.getTriggerId());
        })) {
            return;
        }
        LOGGER.info("submit trigger {}", triggerProfile.toJsonStr());
        this.manager.getJobManager().submitJobProfile(triggerProfile, true);
        this.triggerProfileDB.storeTrigger(triggerProfile);
        restoreTrigger(triggerProfile);
    }

    public void deleteTrigger(String str) {
        if (this.triggerProfileDB.getTriggers().stream().anyMatch(triggerProfile -> {
            return triggerProfile.getTriggerId().equals(str);
        })) {
            LOGGER.info("delete trigger {}", str);
            Trigger remove = this.triggerMap.remove(str);
            if (remove != null) {
                this.manager.getJobManager().deleteJob(remove.getTriggerProfile().getInstanceId());
                remove.destroy();
            }
            this.triggerProfileDB.deleteTrigger(str);
        }
    }

    private Runnable jobFetchThread() {
        return () -> {
            Thread.currentThread().setName("TriggerManager-jobFetch");
            while (isRunnable()) {
                try {
                    this.triggerMap.forEach((str, trigger) -> {
                        Map fetchJobProfile = trigger.fetchJobProfile();
                        if (fetchJobProfile != null) {
                            Map<String, JobWrapper> jobs = this.manager.getJobManager().getJobs();
                            JobWrapper jobWrapper = jobs.get(trigger.getTriggerProfile().getInstanceId());
                            String str = (String) fetchJobProfile.getOrDefault("job.fileJob.dir.patterns", "");
                            Preconditions.checkArgument(StringUtils.isNotBlank(str), String.format("Trigger %s fetched task file should not be null.", str));
                            if (jobWrapper.exist(collection -> {
                                return collection.stream().filter(task -> {
                                    return !task.getJobConf().hasKey("job.fileJob.trigger");
                                }).filter(task2 -> {
                                    return str.equals(task2.getJobConf().get("job.fileJob.dir.patterns", ""));
                                }).findAny().isPresent();
                            })) {
                                return;
                            }
                            LOGGER.info("Trigger job {} add new task file {}, total task {}", new Object[]{jobWrapper.getJob().getName(), str, Integer.valueOf(jobWrapper.getAllTasks().size())});
                            JobWrapper jobWrapper2 = jobs.get(trigger.getTriggerProfile().getInstanceId());
                            JobProfile parseJsonStr = JobProfile.parseJsonStr(jobWrapper2.getJob().getJobConf().toJsonStr());
                            fetchJobProfile.forEach((str2, str3) -> {
                                parseJsonStr.set(str2, str3);
                            });
                            jobWrapper2.submit(parseJsonStr);
                        }
                    });
                    TimeUnit.SECONDS.sleep(this.triggerFetchInterval);
                } catch (Throwable th) {
                    LOGGER.info("ignored exception: ", th);
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                }
            }
        };
    }

    private void initTriggers() {
        Iterator it = this.triggerProfileDB.getTriggers().iterator();
        while (it.hasNext()) {
            restoreTrigger((TriggerProfile) it.next());
        }
    }

    private void stopTriggers() {
        this.triggerMap.forEach((str, trigger) -> {
            trigger.destroy();
        });
    }

    public void start() {
        initTriggers();
        submitWorker(jobFetchThread());
    }

    public void stop() {
        stopTriggers();
    }
}
