/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.heartbeat;

import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.geaflow.cluster.heartbeat.HeartbeatClient;
import org.apache.geaflow.cluster.rpc.RpcClient;
import org.apache.geaflow.cluster.rpc.RpcEndpointRef;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
import org.apache.geaflow.common.heartbeat.Heartbeat;
import org.apache.geaflow.common.utils.ExecutorUtil;
import org.apache.geaflow.common.utils.ThreadUtil;
import org.apache.geaflow.rpc.proto.Master;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeartbeatSender
implements Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatSender.class);
    private final String masterId;
    private final ScheduledExecutorService scheduledService;
    private final Supplier<Heartbeat> heartbeatTrigger;
    private final HeartbeatClient heartbeatClient;
    private final long initialDelayMs;
    private final long intervalMs;
    private ScheduledFuture scheduledFuture;

    public HeartbeatSender(String masterId, Supplier<Heartbeat> heartbeatTrigger, Configuration config, HeartbeatClient heartbeatClient) {
        this.masterId = masterId;
        this.heartbeatTrigger = heartbeatTrigger;
        this.scheduledService = new ScheduledThreadPoolExecutor(1, ThreadUtil.namedThreadFactory((boolean)true, (String)"heartbeat-sender"));
        this.heartbeatClient = heartbeatClient;
        this.initialDelayMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_INITIAL_DELAY_MS);
        this.intervalMs = config.getInteger(ExecutionConfigKeys.HEARTBEAT_INTERVAL_MS);
    }

    public void start() {
        this.scheduledFuture = this.scheduledService.scheduleWithFixedDelay(() -> {
            Heartbeat message = null;
            try {
                message = this.heartbeatTrigger.get();
                if (message != null) {
                    RpcClient.getInstance().sendHeartBeat(this.masterId, message, new RpcEndpointRef.RpcCallback<Master.HeartbeatResponse>(){

                        @Override
                        public void onSuccess(Master.HeartbeatResponse event) {
                            if (!event.getRegistered()) {
                                LOGGER.warn("Heartbeat is not registered.");
                                HeartbeatSender.this.heartbeatClient.registerToMaster();
                            }
                        }

                        @Override
                        public void onFailure(Throwable t) {
                            LOGGER.error("Send heartbeat failed.", t);
                        }
                    });
                }
            }
            catch (Throwable e) {
                LOGGER.error("send heartbeat {} failed", (Object)message, (Object)e);
            }
        }, this.initialDelayMs, this.intervalMs, TimeUnit.MILLISECONDS);
    }

    public void close() {
        LOGGER.info("Close heartbeat sender");
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
        }
        if (this.scheduledService != null) {
            ExecutorUtil.shutdown((ExecutorService)this.scheduledService);
        }
    }
}

