package org.apache.eventmesh.runtime.core.protocol.http.processor;

import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.client.HeartbeatRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.client.HeartbeatResponseHeader;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.class */
public class HeartBeatProcessor implements HttpRequestProcessor {
    public Logger httpLogger = LoggerFactory.getLogger(EventMeshConstants.PROTOCOL_HTTP);
    private EventMeshHTTPServer eventMeshHTTPServer;

    public HeartBeatProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(final ChannelHandlerContext channelHandlerContext, final AsyncContext<HttpCommand> asyncContext) throws Exception {
        this.httpLogger.info("cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(Integer.valueOf(asyncContext.getRequest().getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()), IPUtil.getLocalAddress()});
        HeartbeatRequestHeader header = asyncContext.getRequest().getHeader();
        HeartbeatRequestBody body = asyncContext.getRequest().getBody();
        HeartbeatResponseHeader buildHeader = HeartbeatResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster, IPUtil.getLocalAddress(), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
        if (StringUtils.isBlank(header.getIdc()) || StringUtils.isBlank(header.getPid()) || !StringUtils.isNumeric(header.getPid()) || StringUtils.isBlank(header.getSys())) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg())));
            return;
        }
        if (StringUtils.isBlank(body.getClientType()) || StringUtils.isBlank(body.getConsumerGroup()) || CollectionUtils.isEmpty(body.getHeartbeatEntities())) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg())));
            return;
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        String env = header.getEnv();
        String idc = header.getIdc();
        String sys = header.getSys();
        String ip = header.getIp();
        String pid = header.getPid();
        String consumerGroup = body.getConsumerGroup();
        for (HeartbeatRequestBody.HeartbeatEntity heartbeatEntity : body.getHeartbeatEntities()) {
            String str = heartbeatEntity.topic;
            String str2 = heartbeatEntity.url;
            Client client = new Client();
            client.env = env;
            client.idc = idc;
            client.sys = sys;
            client.ip = ip;
            client.pid = pid;
            client.consumerGroup = consumerGroup;
            client.topic = str;
            client.url = str2;
            client.lastUpTime = new Date();
            if (!StringUtils.isBlank(client.topic) && !StringUtils.isBlank(client.url)) {
                String str3 = client.consumerGroup + "@" + client.topic;
                if (concurrentHashMap.containsKey(str3)) {
                    ((List) concurrentHashMap.get(str3)).add(client);
                } else {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(client);
                    concurrentHashMap.put(str3, arrayList);
                }
            }
        }
        synchronized (this.eventMeshHTTPServer.localClientInfoMapping) {
            for (Map.Entry entry : concurrentHashMap.entrySet()) {
                List<Client> list = this.eventMeshHTTPServer.localClientInfoMapping.get(entry.getKey());
                if (CollectionUtils.isEmpty(list)) {
                    this.eventMeshHTTPServer.localClientInfoMapping.put(entry.getKey(), entry.getValue());
                } else {
                    supplyClientInfoList((List) entry.getValue(), list);
                    this.eventMeshHTTPServer.localClientInfoMapping.put(entry.getKey(), list);
                }
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.SUCCESS.getRetCode(), EventMeshRetCode.SUCCESS.getErrMsg()), new CompleteHandler<HttpCommand>() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.HeartBeatProcessor.1
                @Override // org.apache.eventmesh.runtime.core.protocol.http.async.CompleteHandler
                public void onResponse(HttpCommand httpCommand) {
                    try {
                        if (HeartBeatProcessor.this.httpLogger.isDebugEnabled()) {
                            HeartBeatProcessor.this.httpLogger.debug("{}", httpCommand);
                        }
                        HeartBeatProcessor.this.eventMeshHTTPServer.sendResponse(channelHandlerContext, httpCommand.httpResponse());
                        HeartBeatProcessor.this.eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - ((HttpCommand) asyncContext.getRequest()).getReqTime());
                    } catch (Exception e) {
                    }
                }
            });
        } catch (Exception e) {
            asyncContext.onComplete(asyncContext.getRequest().createHttpCommandResponse(buildHeader, HeartbeatResponseBody.buildBody(EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2))));
            long currentTimeMillis2 = System.currentTimeMillis();
            this.httpLogger.error("message|eventMesh2mq|REQ|ASYNC|heartBeatMessageCost={}ms", Long.valueOf(currentTimeMillis2 - currentTimeMillis), e);
            this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed();
            this.eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(currentTimeMillis2 - currentTimeMillis);
        }
    }

    private void supplyClientInfoList(List<Client> list, List<Client> list2) {
        for (Client client : list) {
            boolean z = false;
            Iterator<Client> it = list2.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Client next = it.next();
                if (StringUtils.equals(next.url, client.url)) {
                    z = true;
                    next.lastUpTime = client.lastUpTime;
                    break;
                }
            }
            if (!z) {
                list2.add(client);
            }
        }
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public boolean rejectRequest() {
        return false;
    }
}
