package org.apache.eventmesh.runtime.core.protocol.grpc.processor;

import java.util.Date;
import java.util.Iterator;
import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.common.protocol.grpc.protos.Heartbeat;
import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader;
import org.apache.eventmesh.common.protocol.grpc.protos.Response;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/processor/HeartbeatProcessor.class */
public class HeartbeatProcessor {
    private final Logger logger = LoggerFactory.getLogger(getClass().getName());
    private final Logger aclLogger = LoggerFactory.getLogger("acl");
    private final EventMeshGrpcServer eventMeshGrpcServer;

    public HeartbeatProcessor(EventMeshGrpcServer eventMeshGrpcServer) {
        this.eventMeshGrpcServer = eventMeshGrpcServer;
    }

    public void process(Heartbeat heartbeat, EventEmitter<Response> eventEmitter) throws Exception {
        RequestHeader header = heartbeat.getHeader();
        if (!ServiceUtils.validateHeader(header)) {
            ServiceUtils.sendRespAndDone(StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, eventEmitter);
            return;
        }
        if (!ServiceUtils.validateHeartBeat(heartbeat)) {
            ServiceUtils.sendRespAndDone(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, eventEmitter);
            return;
        }
        try {
            doAclCheck(heartbeat);
            if (!Heartbeat.ClientType.SUB.equals(heartbeat.getClientType())) {
                ServiceUtils.sendRespAndDone(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, eventEmitter);
                return;
            }
            ConsumerManager consumerManager = this.eventMeshGrpcServer.getConsumerManager();
            String consumerGroup = heartbeat.getConsumerGroup();
            Iterator it = heartbeat.getHeartbeatItemsList().iterator();
            while (it.hasNext()) {
                consumerManager.updateClientTime(ConsumerGroupClient.builder().env(header.getEnv()).idc(header.getIdc()).sys(header.getSys()).ip(header.getIp()).pid(header.getPid()).consumerGroup(consumerGroup).topic(((Heartbeat.HeartbeatItem) it.next()).getTopic()).lastUpTime(new Date()).build());
            }
            ServiceUtils.sendRespAndDone(StatusCode.SUCCESS, "heartbeat success", eventEmitter);
        } catch (AclException e) {
            this.aclLogger.warn("CLIENT HAS NO PERMISSION, HeartbeatProcessor failed", e);
            ServiceUtils.sendRespAndDone(StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), eventEmitter);
        }
    }

    private void doAclCheck(Heartbeat heartbeat) throws AclException {
        RequestHeader header = heartbeat.getHeader();
        if (this.eventMeshGrpcServer.getEventMeshGrpcConfiguration().eventMeshServerSecurityEnable) {
            String ip = header.getIp();
            String username = header.getUsername();
            String password = header.getPassword();
            String sys = header.getSys();
            int intValue = Integer.valueOf(RequestCode.HEARTBEAT.getRequestCode().intValue()).intValue();
            Iterator it = heartbeat.getHeartbeatItemsList().iterator();
            while (it.hasNext()) {
                Acl.doAclCheckInHttpHeartbeat(ip, username, password, sys, ((Heartbeat.HeartbeatItem) it.next()).getTopic(), intValue);
            }
        }
    }
}
