/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.master;

import com.baidu.brpc.server.RpcServerOptions;
import java.util.Map;
import org.apache.geaflow.cluster.clustermanager.ClusterContext;
import org.apache.geaflow.cluster.clustermanager.ClusterInfo;
import org.apache.geaflow.cluster.clustermanager.IClusterManager;
import org.apache.geaflow.cluster.common.AbstractComponent;
import org.apache.geaflow.cluster.heartbeat.HeartbeatManager;
import org.apache.geaflow.cluster.master.IMaster;
import org.apache.geaflow.cluster.master.MasterContext;
import org.apache.geaflow.cluster.master.MasterInfo;
import org.apache.geaflow.cluster.resourcemanager.DefaultResourceManager;
import org.apache.geaflow.cluster.resourcemanager.IResourceManager;
import org.apache.geaflow.cluster.resourcemanager.ResourceManagerContext;
import org.apache.geaflow.cluster.rpc.ConnectAddress;
import org.apache.geaflow.cluster.rpc.impl.MasterEndpoint;
import org.apache.geaflow.cluster.rpc.impl.ResourceManagerEndpoint;
import org.apache.geaflow.cluster.rpc.impl.RpcServiceImpl;
import org.apache.geaflow.cluster.web.HttpServer;
import org.apache.geaflow.cluster.web.agent.AgentWebServer;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
import org.apache.geaflow.common.rpc.ConfigurableServerOption;
import org.apache.geaflow.common.utils.PortUtil;
import org.apache.geaflow.common.utils.ProcessUtil;
import org.apache.geaflow.ha.leaderelection.ILeaderContender;
import org.apache.geaflow.ha.leaderelection.ILeaderElectionService;
import org.apache.geaflow.ha.leaderelection.LeaderElectionServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractMaster
extends AbstractComponent
implements IMaster {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMaster.class);
    protected IResourceManager resourceManager;
    protected IClusterManager clusterManager;
    protected HeartbeatManager heartbeatManager;
    protected ConnectAddress masterAddress;
    protected int agentPort;
    protected int httpPort;
    protected HttpServer httpServer;
    protected ClusterContext clusterContext;
    protected ILeaderElectionService leaderElectionService;

    public AbstractMaster() {
        this(0);
    }

    public AbstractMaster(int rpcPort) {
        super(rpcPort);
    }

    @Override
    public void init(MasterContext context) {
        super.init(context.getId(), context.getConfiguration().getMasterId(), context.getConfiguration());
        this.clusterManager = context.getClusterManager();
        this.clusterContext = context.getClusterContext();
        this.heartbeatManager = new HeartbeatManager(this.configuration, this.clusterManager);
        this.resourceManager = new DefaultResourceManager(this.clusterManager);
        this.clusterContext.setHeartbeatManager(this.heartbeatManager);
        this.httpPort = this.configuration.getInteger(ExecutionConfigKeys.MASTER_HTTP_PORT);
        this.initEnv(context);
    }

    protected void initEnv(MasterContext context) {
        this.clusterManager.init(this.clusterContext);
        this.startRpcService(this.clusterManager, this.resourceManager);
        this.registerHAService();
        this.resourceManager.init(ResourceManagerContext.build(context, this.clusterContext));
        if (this.configuration.getBoolean(ExecutionConfigKeys.HTTP_REST_SERVICE_ENABLE)) {
            this.agentPort = this.startAgent();
            this.httpServer = new HttpServer(this.configuration, this.clusterManager, this.heartbeatManager, this.resourceManager, this.buildMasterInfo());
            this.httpServer.start();
        }
        this.registerHeartbeat();
    }

    public void initLeaderElectionService(ILeaderContender contender, Configuration configuration, int componentId) {
        this.leaderElectionService = LeaderElectionServiceFactory.loadElectionService((Configuration)configuration);
        this.leaderElectionService.init(configuration, String.valueOf(componentId));
        this.leaderElectionService.open(contender);
        LOGGER.info("Leader election service enabled for master.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForLeaderElection() throws InterruptedException {
        LOGGER.info("Wait for becoming a leader...");
        ILeaderElectionService iLeaderElectionService = this.leaderElectionService;
        synchronized (iLeaderElectionService) {
            this.leaderElectionService.wait();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyLeaderElection() {
        ILeaderElectionService iLeaderElectionService = this.leaderElectionService;
        synchronized (iLeaderElectionService) {
            this.leaderElectionService.notify();
        }
    }

    protected void startRpcService(IClusterManager clusterManager, IResourceManager resourceManager) {
        RpcServerOptions serverOptions = ConfigurableServerOption.build((Configuration)this.configuration);
        int port = PortUtil.getPort((int)this.rpcPort);
        this.rpcService = new RpcServiceImpl(port, serverOptions);
        this.rpcService.addEndpoint(new MasterEndpoint(this, clusterManager));
        this.rpcService.addEndpoint(new ResourceManagerEndpoint(resourceManager));
        this.rpcPort = this.rpcService.startService();
        this.masterAddress = new ConnectAddress(ProcessUtil.getHostIp(), this.httpPort);
    }

    public ClusterInfo startCluster() {
        ClusterInfo clusterInfo = new ClusterInfo();
        clusterInfo.setMasterAddress(this.masterAddress);
        Map<String, ConnectAddress> driverAddresses = this.clusterManager.startDrivers();
        clusterInfo.setDriverAddresses(driverAddresses);
        LOGGER.info("init cluster with info: {}", (Object)clusterInfo);
        return clusterInfo;
    }

    private int startAgent() {
        int port = PortUtil.getPort((int)this.configuration.getInteger(ExecutionConfigKeys.AGENT_HTTP_PORT));
        AgentWebServer agentServer = new AgentWebServer(port, this.configuration);
        agentServer.start();
        return port;
    }

    protected MasterInfo buildMasterInfo() {
        MasterInfo componentInfo = new MasterInfo();
        componentInfo.setId(this.id);
        componentInfo.setName(this.name);
        componentInfo.setHost(ProcessUtil.getHostIp());
        componentInfo.setPid(ProcessUtil.getProcessId());
        componentInfo.setRpcPort(this.rpcPort);
        componentInfo.setAgentPort(this.agentPort);
        componentInfo.setHttpPort(this.httpPort);
        return componentInfo;
    }

    protected void registerHeartbeat() {
        MasterInfo componentInfo = this.buildMasterInfo();
        this.heartbeatManager.registerMasterHeartbeat(componentInfo);
    }

    @Override
    public void close() {
        super.close();
        this.clusterManager.close();
        if (this.heartbeatManager != null) {
            this.heartbeatManager.close();
        }
        if (this.httpServer != null) {
            this.httpServer.stop();
        }
        LOGGER.info("master {} closed", (Object)this.name);
    }
}

