package org.apache.inlong.agent.core.task;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.db.JobProfileDb;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/core/task/TaskPositionManager.class */
public class TaskPositionManager extends AbstractDaemon {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskPositionManager.class);
    private static volatile TaskPositionManager taskPositionManager = null;
    private final AgentManager agentManager;
    private final JobProfileDb jobConfDb;
    private final AgentConfiguration conf = AgentConfiguration.getAgentConf();
    private ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> jobTaskPositionMap = new ConcurrentHashMap<>();

    private TaskPositionManager(AgentManager agentManager) {
        this.agentManager = agentManager;
        this.jobConfDb = agentManager.getJobManager().getJobConfDb();
    }

    public static TaskPositionManager getTaskPositionManager(AgentManager agentManager) {
        if (taskPositionManager == null) {
            synchronized (TaskPositionManager.class) {
                if (taskPositionManager == null) {
                    taskPositionManager = new TaskPositionManager(agentManager);
                }
            }
        }
        return taskPositionManager;
    }

    public static TaskPositionManager getTaskPositionManager() {
        if (taskPositionManager == null) {
            throw new RuntimeException("task position manager has not been initialized by agentManager");
        }
        return taskPositionManager;
    }

    public void start() throws Exception {
        submitWorker(taskPositionFlushThread());
    }

    private Runnable taskPositionFlushThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    Iterator it = this.jobTaskPositionMap.keySet().iterator();
                    while (it.hasNext()) {
                        String str = (String) it.next();
                        JobProfile jobById = this.jobConfDb.getJobById(str);
                        if (jobById == null) {
                            LOGGER.warn("jobProfile {} cannot be found in db, might be deleted by standalone mode, now delete job position in memory", str);
                            deleteJobPosition(str);
                        } else {
                            flushJobProfile(str, jobById);
                        }
                    }
                    TimeUnit.SECONDS.sleep(this.conf.getInt("agent.heartbeat.interval", 10));
                } catch (Throwable th) {
                    LOGGER.error("error caught", th);
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                }
            }
        };
    }

    private void flushJobProfile(String str, JobProfile jobProfile) {
        this.jobTaskPositionMap.get(str).forEach((str2, l) -> {
            jobProfile.setLong(str2 + ".position", l.longValue());
        });
        if (!this.jobConfDb.checkJobfinished(jobProfile)) {
            this.jobConfDb.updateJobProfile(jobProfile);
        } else {
            LOGGER.info("Cannot update job profile {}, delete memory job in jobTaskPosition", str);
            deleteJobPosition(str);
        }
    }

    private void deleteJobPosition(String str) {
        this.jobTaskPositionMap.remove(str);
    }

    public void stop() throws Exception {
        waitForTerminate();
    }

    public void updateSinkPosition(String str, String str2, long j) {
        ConcurrentHashMap<String, Long> concurrentHashMap = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, Long> putIfAbsent = this.jobTaskPositionMap.putIfAbsent(str, concurrentHashMap);
        if (putIfAbsent == null) {
            concurrentHashMap.put(str2, Long.valueOf(this.jobConfDb.getJobById(str).getLong(str2 + ".position", 0L)));
            putIfAbsent = concurrentHashMap;
        }
        putIfAbsent.put(str2, Long.valueOf(putIfAbsent.getOrDefault(str2, 0L).longValue() + j));
    }

    public ConcurrentHashMap<String, Long> getTaskPositionMap(String str) {
        return this.jobTaskPositionMap.get(str);
    }

    public ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> getJobTaskPosition() {
        return this.jobTaskPositionMap;
    }
}
