package org.apache.tajo.master.rm;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.ResourceProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.scheduler.event.SchedulerEvent;
import org.apache.tajo.master.scheduler.event.SchedulerEventType;
import org.apache.tajo.resource.NodeResource;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TUtil;

/* loaded from: input_file:org/apache/tajo/master/rm/TajoResourceTracker.class */
public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService.Interface {
    private Log LOG;
    private final TajoResourceManager manager;
    private final TajoRMContext rmContext;
    private final NodeLivelinessMonitor nodeLivelinessMonitor;
    private AsyncRpcServer server;
    private InetSocketAddress bindAddress;
    private int activeInterval;

    public TajoResourceTracker(TajoResourceManager tajoResourceManager, NodeLivelinessMonitor nodeLivelinessMonitor) {
        super(TajoResourceTracker.class.getSimpleName());
        this.LOG = LogFactory.getLog(TajoResourceTracker.class);
        this.manager = tajoResourceManager;
        this.rmContext = tajoResourceManager.getRMContext();
        this.nodeLivelinessMonitor = nodeLivelinessMonitor;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        TajoConf tajoConf = (TajoConf) TUtil.checkTypeAndGet(configuration, TajoConf.class);
        this.activeInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_ACTIVE_INTERVAL);
        this.server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, NetUtils.createSocketAddr(tajoConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS)), tajoConf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM));
        this.server.start();
        this.bindAddress = NetUtils.getConnectAddress(this.server.getListenAddress());
        tajoConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(this.bindAddress));
        this.LOG.info("TajoResourceTracker starts up (" + this.bindAddress + ")");
        super.serviceInit(configuration);
    }

    public void serviceStop() throws Exception {
        if (this.server != null) {
            this.server.shutdown();
            this.server = null;
        }
        super.serviceStop();
    }

    private static NodeStatusEvent createStatusEvent(ResourceProtos.NodeHeartbeatRequest nodeHeartbeatRequest) {
        return new NodeStatusEvent(nodeHeartbeatRequest.getWorkerId(), nodeHeartbeatRequest.getRunningTasks(), nodeHeartbeatRequest.getRunningQueryMasters(), new NodeResource(nodeHeartbeatRequest.getAvailableResource()), nodeHeartbeatRequest.hasTotalResource() ? new NodeResource(nodeHeartbeatRequest.getTotalResource()) : null);
    }

    @Override // org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService.Interface
    public void nodeHeartbeat(RpcController rpcController, ResourceProtos.NodeHeartbeatRequest nodeHeartbeatRequest, RpcCallback<ResourceProtos.NodeHeartbeatResponse> rpcCallback) {
        ResourceProtos.NodeHeartbeatResponse.Builder newBuilder = ResourceProtos.NodeHeartbeatResponse.newBuilder();
        ResourceProtos.ResponseCommand responseCommand = ResourceProtos.ResponseCommand.NORMAL;
        try {
            int workerId = nodeHeartbeatRequest.getWorkerId();
            if (this.rmContext.getNodes().containsKey(Integer.valueOf(workerId))) {
                this.rmContext.getDispatcher().getEventHandler().handle(createStatusEvent(nodeHeartbeatRequest));
                this.rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
                this.nodeLivelinessMonitor.receivedPing(Integer.valueOf(workerId));
            } else if (this.rmContext.getInactiveNodes().containsKey(Integer.valueOf(workerId))) {
                if (nodeHeartbeatRequest.hasConnectionInfo()) {
                    this.nodeLivelinessMonitor.unregister(Integer.valueOf(this.rmContext.getInactiveNodes().remove(Integer.valueOf(workerId)).getWorkerId()));
                    NodeStatus createNodeStatus = createNodeStatus(nodeHeartbeatRequest);
                    int workerId2 = createNodeStatus.getWorkerId();
                    this.rmContext.getNodes().putIfAbsent(Integer.valueOf(workerId2), createNodeStatus);
                    this.rmContext.getDispatcher().getEventHandler().handle(new NodeEvent(workerId2, NodeEventType.STARTED));
                    this.nodeLivelinessMonitor.register(Integer.valueOf(workerId2));
                    this.rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
                } else {
                    responseCommand = ResourceProtos.ResponseCommand.MEMBERSHIP;
                }
            } else if (nodeHeartbeatRequest.hasConnectionInfo()) {
                NodeStatus createNodeStatus2 = createNodeStatus(nodeHeartbeatRequest);
                if (this.rmContext.getNodes().putIfAbsent(Integer.valueOf(workerId), createNodeStatus2) == null) {
                    this.rmContext.rmDispatcher.getEventHandler().handle(new NodeEvent(workerId, NodeEventType.STARTED));
                } else {
                    this.LOG.info("Reconnect from the node at: " + workerId);
                    this.nodeLivelinessMonitor.unregister(Integer.valueOf(workerId));
                    this.rmContext.getDispatcher().getEventHandler().handle(new NodeReconnectEvent(workerId, createNodeStatus2));
                }
                this.nodeLivelinessMonitor.register(Integer.valueOf(workerId));
                this.rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE));
            } else {
                responseCommand = ResourceProtos.ResponseCommand.MEMBERSHIP;
            }
            if (this.manager.getScheduler().getRunningQuery() > 0) {
                newBuilder.setHeartBeatInterval(this.activeInterval);
            }
            rpcCallback.run(newBuilder.setCommand(responseCommand).build());
        } catch (Throwable th) {
            if (this.manager.getScheduler().getRunningQuery() > 0) {
                newBuilder.setHeartBeatInterval(this.activeInterval);
            }
            rpcCallback.run(newBuilder.setCommand(responseCommand).build());
            throw th;
        }
    }

    private NodeStatus createNodeStatus(ResourceProtos.NodeHeartbeatRequest nodeHeartbeatRequest) {
        return new NodeStatus(this.rmContext, new NodeResource(nodeHeartbeatRequest.getTotalResource()), new WorkerConnectionInfo(nodeHeartbeatRequest.getConnectionInfo()));
    }
}
