/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.heartbeat;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.geaflow.cluster.clustermanager.AbstractClusterManager;
import org.apache.geaflow.cluster.clustermanager.IClusterManager;
import org.apache.geaflow.cluster.common.ComponentInfo;
import org.apache.geaflow.cluster.container.ContainerInfo;
import org.apache.geaflow.cluster.rpc.RpcClient;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
import org.apache.geaflow.common.exception.GeaflowHeartbeatException;
import org.apache.geaflow.common.heartbeat.Heartbeat;
import org.apache.geaflow.common.heartbeat.HeartbeatInfo;
import org.apache.geaflow.common.utils.ExecutorUtil;
import org.apache.geaflow.common.utils.ThreadUtil;
import org.apache.geaflow.rpc.proto.Master;
import org.apache.geaflow.rpc.proto.Supervisor;
import org.apache.geaflow.stats.collector.StatsCollectorFactory;
import org.apache.geaflow.stats.sink.IStatsWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatManager.class);
    private final long heartbeatTimeoutMs;
    private final long heartbeatReportExpiredMs;
    private final Map<Integer, Heartbeat> senderMap = new ConcurrentHashMap<Integer, Heartbeat>();
    private final AbstractClusterManager clusterManager;
    private final ScheduledFuture<?> timeoutFuture;
    private final ScheduledFuture<?> reportFuture;
    private final ScheduledExecutorService checkTimeoutService;
    private final ScheduledExecutorService heartbeatReportService;
    private final IStatsWriter statsWriter;
    private ScheduledFuture<?> checkFuture;
    private volatile boolean isRunning = true;

    public HeartbeatManager(Configuration config, IClusterManager clusterManager) {
        this.heartbeatTimeoutMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_TIMEOUT_MS);
        int heartbeatReportMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_REPORT_INTERVAL_MS);
        int defaultReportExpiredMs = (int)((double)(this.heartbeatTimeoutMs + (long)heartbeatReportMs) * 1.2);
        this.heartbeatReportExpiredMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_REPORT_EXPIRED_MS, defaultReportExpiredMs);
        boolean supervisorEnable = config.getBoolean(ExecutionConfigKeys.SUPERVISOR_ENABLE);
        int corePoolSize = supervisorEnable ? 2 : 1;
        this.checkTimeoutService = new ScheduledThreadPoolExecutor(corePoolSize, ThreadUtil.namedThreadFactory((boolean)true, (String)"heartbeat-manager"));
        int initDelayMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_INITIAL_DELAY_MS);
        this.timeoutFuture = this.checkTimeoutService.scheduleAtFixedRate(this::checkHeartBeat, initDelayMs, this.heartbeatTimeoutMs, TimeUnit.MILLISECONDS);
        if (supervisorEnable) {
            long heartbeatCheckMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_INTERVAL_MS);
            this.checkFuture = this.checkTimeoutService.scheduleAtFixedRate(this::checkWorkerHealth, heartbeatCheckMs, heartbeatCheckMs, TimeUnit.MILLISECONDS);
        }
        this.heartbeatReportService = new ScheduledThreadPoolExecutor(1, ThreadUtil.namedThreadFactory((boolean)true, (String)"heartbeat-report"));
        this.reportFuture = this.heartbeatReportService.scheduleAtFixedRate(this::reportHeartbeat, heartbeatReportMs, heartbeatReportMs, TimeUnit.MILLISECONDS);
        this.clusterManager = (AbstractClusterManager)clusterManager;
        this.statsWriter = StatsCollectorFactory.init((Configuration)config).getStatsWriter();
    }

    public Master.HeartbeatResponse receivedHeartbeat(Heartbeat heartbeat) {
        this.senderMap.put(heartbeat.getContainerId(), heartbeat);
        boolean registered = this.isRegistered(heartbeat.getContainerId());
        return Master.HeartbeatResponse.newBuilder().setSuccess(true).setRegistered(registered).build();
    }

    public void registerMasterHeartbeat(ComponentInfo masterInfo) {
        this.statsWriter.addMetric(masterInfo.getName(), (Object)masterInfo);
    }

    void checkHeartBeat() {
        try {
            long checkTime = System.currentTimeMillis();
            this.checkTimeout(this.clusterManager.getContainerIds(), checkTime);
            this.checkTimeout(this.clusterManager.getDriverIds(), checkTime);
        }
        catch (Throwable e) {
            LOGGER.warn("Catch unexpect error", e);
        }
    }

    private void checkTimeout(Map<Integer, String> map, long checkTime) {
        for (Map.Entry<Integer, String> entry : map.entrySet()) {
            int componentId = entry.getKey();
            Heartbeat heartbeat = this.senderMap.get(componentId);
            if (heartbeat == null) {
                if (this.isRegistered(componentId)) {
                    LOGGER.warn("{} heartbeat is not received", (Object)entry.getValue());
                    continue;
                }
                LOGGER.warn("{} is not registered", (Object)entry.getValue());
                continue;
            }
            if (checkTime <= heartbeat.getTimestamp() + this.heartbeatTimeoutMs) continue;
            String message = String.format("%s heartbeat is lost", entry.getValue());
            LOGGER.error(message);
            this.doFailover(componentId, (Throwable)new GeaflowHeartbeatException(message));
        }
    }

    public void reportHeartbeat() {
        HeartbeatInfo heartbeatInfo = this.buildHeartbeatInfo();
        StatsCollectorFactory collectorFactory = StatsCollectorFactory.getInstance();
        if (collectorFactory != null) {
            collectorFactory.getHeartbeatCollector().reportHeartbeat(heartbeatInfo);
        }
    }

    void checkWorkerHealth() {
        try {
            this.checkWorkerHealth(this.clusterManager.getContainerIds());
            this.checkWorkerHealth(this.clusterManager.getDriverIds());
        }
        catch (Throwable e) {
            LOGGER.warn("Check container healthy error: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    private void checkWorkerHealth(Map<Integer, String> map) {
        for (Map.Entry<Integer, String> entry : map.entrySet()) {
            String message;
            String name = entry.getValue();
            try {
                Supervisor.StatusResponse response = RpcClient.getInstance().queryWorkerStatusBySupervisor(name);
                if (response.getIsAlive()) continue;
                message = String.format("worker %s is not alive", name);
                LOGGER.error(message);
                this.doFailover(entry.getKey(), (Throwable)new GeaflowHeartbeatException(message));
            }
            catch (Throwable e) {
                message = String.format("connect to supervisor of %s failed: %s", name, e.getMessage());
                LOGGER.error(message, e);
                this.doFailover(entry.getKey(), (Throwable)new GeaflowHeartbeatException(message, e));
            }
        }
    }

    void doFailover(int componentId, Throwable e) {
        this.clusterManager.doFailover(componentId, e);
    }

    protected boolean isRegistered(int componentId) {
        AbstractClusterManager cm = this.clusterManager;
        return cm.getContainerInfos().containsKey(componentId) || cm.getDriverInfos().containsKey(componentId);
    }

    protected HeartbeatInfo buildHeartbeatInfo() {
        Map<Integer, Heartbeat> heartbeatMap = this.getHeartBeatMap();
        Map<Integer, ContainerInfo> containerMap = this.clusterManager.getContainerInfos();
        Map<Integer, String> containerIndex = this.clusterManager.getContainerIds();
        int totalContainerNum = containerIndex.size();
        ArrayList<HeartbeatInfo.ContainerHeartbeatInfo> containerList = new ArrayList<HeartbeatInfo.ContainerHeartbeatInfo>();
        int activeContainers = 0;
        for (Map.Entry<Integer, ContainerInfo> entry : containerMap.entrySet()) {
            HeartbeatInfo.ContainerHeartbeatInfo containerHeartbeatInfo = new HeartbeatInfo.ContainerHeartbeatInfo();
            containerHeartbeatInfo.setId(entry.getKey());
            ContainerInfo info = entry.getValue();
            containerHeartbeatInfo.setName(info.getName());
            containerHeartbeatInfo.setHost(info.getHost());
            containerHeartbeatInfo.setPid(info.getPid());
            Heartbeat heartbeat = heartbeatMap.get(entry.getKey());
            if (heartbeat != null) {
                containerHeartbeatInfo.setLastTimestamp(Long.valueOf(heartbeat.getTimestamp()));
                containerHeartbeatInfo.setMetrics(heartbeat.getProcessMetrics());
                ++activeContainers;
            }
            containerList.add(containerHeartbeatInfo);
        }
        HeartbeatInfo heartbeatInfo = new HeartbeatInfo();
        heartbeatInfo.setExpiredTimeMs(this.heartbeatReportExpiredMs);
        heartbeatInfo.setTotalNum(totalContainerNum);
        heartbeatInfo.setActiveNum(activeContainers);
        heartbeatInfo.setContainers(containerList);
        return heartbeatInfo;
    }

    public Map<Integer, Heartbeat> getHeartBeatMap() {
        return this.senderMap;
    }

    public Set<Integer> getActiveContainerIds() {
        Map<Integer, String> containerIdMap = this.clusterManager.getContainerIds();
        return this.getActiveComponentIds(containerIdMap);
    }

    public Set<Integer> getActiveDriverIds() {
        Map<Integer, String> driverIdMap = this.clusterManager.getDriverIds();
        return this.getActiveComponentIds(driverIdMap);
    }

    private Set<Integer> getActiveComponentIds(Map<Integer, String> map) {
        long checkTime = System.currentTimeMillis();
        HashSet<Integer> activeComponentIds = new HashSet<Integer>();
        for (Map.Entry<Integer, String> entry : map.entrySet()) {
            int componentId = entry.getKey();
            Heartbeat heartbeat = this.senderMap.get(componentId);
            if (heartbeat == null || checkTime > heartbeat.getTimestamp() + this.heartbeatTimeoutMs) continue;
            activeComponentIds.add(componentId);
        }
        return activeComponentIds;
    }

    public void close() {
        if (!this.isRunning) {
            return;
        }
        this.isRunning = false;
        if (this.timeoutFuture != null) {
            this.timeoutFuture.cancel(true);
        }
        if (this.checkFuture != null) {
            this.checkFuture.cancel(true);
        }
        if (this.checkTimeoutService != null) {
            ExecutorUtil.shutdown((ExecutorService)this.checkTimeoutService);
        }
        if (this.reportFuture != null) {
            this.reportFuture.cancel(true);
        }
        if (this.heartbeatReportService != null) {
            ExecutorUtil.shutdown((ExecutorService)this.heartbeatReportService);
        }
        LOGGER.info("HeartbeatManager is closed");
    }
}

