package org.apache.inlong.agent.core;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.job.JobManager;
import org.apache.inlong.agent.core.job.JobWrapper;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.GroupHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.common.heartbeat.StreamHeartbeat;
import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/core/HeartbeatManager.class */
public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbeatManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatManager.class);
    private final AgentManager agentManager;
    private final JobManager jobmanager;
    private final Pattern numberPattern = Pattern.compile("^[-+]?[\\d]*$");
    private final AgentConfiguration conf = AgentConfiguration.getAgentConf();
    private final HttpManager httpManager = new HttpManager(this.conf);
    private final String baseManagerUrl = buildBaseUrl();
    private final String reportSnapshotUrl = buildReportSnapShotUrl(this.baseManagerUrl);
    private final String reportHeartbeatUrl = buildReportHeartbeatUrl(this.baseManagerUrl);

    public HeartbeatManager(AgentManager agentManager) {
        this.agentManager = agentManager;
        this.jobmanager = agentManager.getJobManager();
    }

    public void start() throws Exception {
        submitWorker(snapshotReportThread());
        submitWorker(heartbeatReportThread());
    }

    private Runnable snapshotReportThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    TaskSnapshotRequest buildTaskSnapshotRequest = buildTaskSnapshotRequest();
                    this.httpManager.doSentPost(this.reportSnapshotUrl, buildTaskSnapshotRequest);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(" {} report to manager", buildTaskSnapshotRequest);
                    }
                    TimeUnit.SECONDS.sleep(this.conf.getInt("agent.heartbeat.interval", 10));
                } catch (Throwable th) {
                    LOGGER.error("interrupted while report snapshot", th);
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                }
            }
        };
    }

    private Runnable heartbeatReportThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    reportHeartbeat(buildHeartbeatMsg());
                    TimeUnit.SECONDS.sleep(heartbeatInterval());
                } catch (Throwable th) {
                    LOGGER.error("interrupted while report heartbeat", th);
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                }
            }
        };
    }

    public void stop() throws Exception {
        waitForTerminate();
    }

    public void reportHeartbeat(HeartbeatMsg heartbeatMsg) {
        this.httpManager.doSentPost(this.reportHeartbeatUrl, heartbeatMsg);
    }

    private TaskSnapshotRequest buildTaskSnapshotRequest() {
        Map<String, JobWrapper> jobs = this.jobmanager.getJobs();
        ArrayList arrayList = new ArrayList();
        TaskSnapshotRequest taskSnapshotRequest = new TaskSnapshotRequest();
        Date date = new Date(System.currentTimeMillis());
        for (Map.Entry<String, JobWrapper> entry : jobs.entrySet()) {
            if (StringUtils.isBlank(entry.getKey()) || entry.getValue() == null) {
                LOGGER.info("key: {} or value: {} is null", entry.getKey(), entry.getValue());
            } else {
                String snapshot = entry.getValue().getSnapshot();
                String key = entry.getKey();
                TaskSnapshotMessage taskSnapshotMessage = new TaskSnapshotMessage();
                taskSnapshotMessage.setSnapshot(snapshot);
                if (this.numberPattern.matcher(key).matches()) {
                    taskSnapshotMessage.setJobId(Integer.valueOf(key));
                    arrayList.add(taskSnapshotMessage);
                }
            }
        }
        taskSnapshotRequest.setSnapshotList(arrayList);
        taskSnapshotRequest.setReportTime(date);
        taskSnapshotRequest.setAgentIp(AgentUtils.fetchLocalIp());
        taskSnapshotRequest.setUuid(AgentUtils.fetchLocalUuid());
        return taskSnapshotRequest;
    }

    private HeartbeatMsg buildHeartbeatMsg() {
        String fetchLocalIp = AgentUtils.fetchLocalIp();
        int i = this.conf.getInt("agent.http.port", 8008);
        String str = this.conf.get("agent.cluster.name");
        String str2 = this.conf.get("agent.cluster.tag");
        String str3 = this.conf.get("agent.cluster.inCharges");
        HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
        heartbeatMsg.setIp(fetchLocalIp);
        heartbeatMsg.setPort(String.valueOf(i));
        heartbeatMsg.setComponentType(ComponentTypeEnum.Agent.getType());
        heartbeatMsg.setReportTime(Long.valueOf(System.currentTimeMillis()));
        if (StringUtils.isNotBlank(str)) {
            heartbeatMsg.setClusterName(str);
        }
        if (StringUtils.isNotBlank(str2)) {
            heartbeatMsg.setClusterTag(str2);
        }
        if (StringUtils.isNotBlank(str3)) {
            heartbeatMsg.setInCharges(str3);
        }
        Map<String, JobWrapper> jobs = this.jobmanager.getJobs();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        jobs.values().forEach(jobWrapper -> {
            JobProfile jobConf = jobWrapper.getJob().getJobConf();
            String str4 = jobConf.get("job.groupId");
            String str5 = jobConf.get("job.streamId");
            String name = jobWrapper.getCurrentState().name();
            GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
            groupHeartbeat.setInlongGroupId(str4);
            groupHeartbeat.setStatus(name);
            newArrayList.add(groupHeartbeat);
            StreamHeartbeat streamHeartbeat = new StreamHeartbeat();
            streamHeartbeat.setInlongGroupId(str4);
            streamHeartbeat.setInlongStreamId(str5);
            streamHeartbeat.setStatus(name);
            newArrayList2.add(streamHeartbeat);
        });
        heartbeatMsg.setGroupHeartbeats(newArrayList);
        heartbeatMsg.setStreamHeartbeats(newArrayList2);
        return heartbeatMsg;
    }

    private String buildBaseUrl() {
        return "http://" + this.conf.get("agent.manager.vip.http.host") + ":" + this.conf.get("agent.manager.vip.http.port") + this.conf.get("agent.manager.vip.http.prefix.path", "/inlong/manager/openapi");
    }

    private String buildReportSnapShotUrl(String str) {
        return str + this.conf.get("agent.manager.reportsnapshot.http.path", "/agent/reportSnapshot");
    }

    private String buildReportHeartbeatUrl(String str) {
        return str + this.conf.get("agent.manager.heartbeat.http.path", "/heartbeat/report");
    }
}
