package org.apache.eventmesh.runtime.core.protocol.http.processor;

import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.client.HeartbeatRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.client.HeartbeatResponseHeader;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.class */
public class HeartBeatProcessor implements HttpRequestProcessor {
    private static final Logger log = LoggerFactory.getLogger(HeartBeatProcessor.class);
    private final transient EventMeshHTTPServer eventMeshHTTPServer;
    private final Acl acl;

    public HeartBeatProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
        this.acl = eventMeshHTTPServer.getAcl();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(ChannelHandlerContext channelHandlerContext, AsyncContext<HttpCommand> asyncContext) throws Exception {
        String localAddress = IPUtils.getLocalAddress();
        HttpCommand request = asyncContext.getRequest();
        LogUtils.info(log, "cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(Integer.valueOf(request.getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), localAddress});
        HeartbeatRequestHeader header = request.getHeader();
        HeartbeatRequestBody body = request.getBody();
        EventMeshHTTPConfiguration eventMeshHttpConfiguration = this.eventMeshHTTPServer.getEventMeshHttpConfiguration();
        HeartbeatResponseHeader buildHeader = HeartbeatResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()), eventMeshHttpConfiguration.getEventMeshCluster(), localAddress, eventMeshHttpConfiguration.getEventMeshEnv(), eventMeshHttpConfiguration.getEventMeshIDC());
        if (StringUtils.isAnyBlank(new CharSequence[]{header.getIdc(), header.getPid(), header.getSys()}) || !StringUtils.isNumeric(header.getPid())) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, HeartbeatResponseBody.class);
            return;
        }
        if (StringUtils.isAnyBlank(new CharSequence[]{body.getClientType(), body.getConsumerGroup()}) || CollectionUtils.isEmpty(body.getHeartbeatEntities())) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, HeartbeatResponseBody.class);
            return;
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (HeartbeatRequestBody.HeartbeatEntity heartbeatEntity : body.getHeartbeatEntities()) {
            Client client = new Client();
            client.setEnv(header.getEnv());
            client.setIdc(header.getIdc());
            client.setSys(header.getSys());
            client.setIp(header.getIp());
            client.setPid(header.getPid());
            client.setConsumerGroup(body.getConsumerGroup());
            client.setTopic(heartbeatEntity.topic);
            client.setUrl(heartbeatEntity.url);
            client.setLastUpTime(new Date());
            if (!StringUtils.isAnyBlank(new CharSequence[]{client.getTopic(), client.getUrl()})) {
                if (this.eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
                    try {
                        this.acl.doAclCheckInHttpHeartbeat(RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), header.getUsername(), header.getPasswd(), header.getSys(), client.getTopic(), Integer.parseInt(header.getCode()));
                    } catch (Exception e) {
                        completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), HeartbeatResponseBody.class);
                        LogUtils.warn(log, "CLIENT HAS NO PERMISSION,HeartBeatProcessor subscribe failed", e);
                        return;
                    }
                }
                ((List) concurrentHashMap.computeIfAbsent(client.getConsumerGroup() + "@" + client.getTopic(), str -> {
                    return new ArrayList();
                })).add(client);
            }
        }
        ConcurrentHashMap<String, List<Client>> localClientInfoMapping = this.eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping();
        synchronized (localClientInfoMapping) {
            for (Map.Entry entry : concurrentHashMap.entrySet()) {
                List<Client> list = (List) localClientInfoMapping.get(entry.getKey());
                if (CollectionUtils.isEmpty(list)) {
                    localClientInfoMapping.put(entry.getKey(), entry.getValue());
                } else {
                    supplyClientInfoList((List) entry.getValue(), list);
                    localClientInfoMapping.put(entry.getKey(), list);
                }
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        HttpSummaryMetrics summaryMetrics = this.eventMeshHTTPServer.getMetrics().getSummaryMetrics();
        try {
            asyncContext.onComplete(request.createHttpCommandResponse(EventMeshRetCode.SUCCESS), httpCommand -> {
                try {
                    LogUtils.debug(log, "{}", httpCommand);
                    this.eventMeshHTTPServer.sendResponse(channelHandlerContext, httpCommand.httpResponse());
                    summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
                } catch (Exception e2) {
                }
            });
        } catch (Exception e2) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR, EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getErrMsg() + EventMeshUtil.stackTrace(e2, 2), HeartbeatResponseBody.class);
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            LogUtils.error(log, "message|eventMesh2mq|REQ|ASYNC|heartBeatMessageCost={}ms", Long.valueOf(currentTimeMillis2), e2);
            summaryMetrics.recordSendMsgFailed();
            summaryMetrics.recordSendMsgCost(currentTimeMillis2);
        }
    }

    private void supplyClientInfoList(List<Client> list, List<Client> list2) {
        Objects.requireNonNull(list, "tmpClientList can not be null");
        Objects.requireNonNull(list2, "localClientList can not be null");
        for (Client client : list) {
            boolean z = false;
            Iterator<Client> it = list2.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Client next = it.next();
                if (StringUtils.equals(next.getUrl(), client.getUrl())) {
                    z = true;
                    next.setLastUpTime(client.getLastUpTime());
                    break;
                }
            }
            if (!z) {
                list2.add(client);
            }
        }
    }
}
