package org.apache.eventmesh.runtime.core.protocol.http.processor;

import io.netty.channel.ChannelHandlerContext;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.SubscribeResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.client.SubscribeRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.client.SubscribeResponseHeader;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.metrics.api.model.HttpSummaryMetrics;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumer.ClientInfo;
import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.class */
public class SubscribeProcessor implements HttpRequestProcessor {
    private static final Logger log = LoggerFactory.getLogger(SubscribeProcessor.class);
    private final transient EventMeshHTTPServer eventMeshHTTPServer;
    private final Acl acl;

    public SubscribeProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        this.eventMeshHTTPServer = eventMeshHTTPServer;
        this.acl = eventMeshHTTPServer.getAcl();
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor
    public void processRequest(ChannelHandlerContext channelHandlerContext, AsyncContext<HttpCommand> asyncContext) throws Exception {
        HttpCommand request = asyncContext.getRequest();
        Integer valueOf = Integer.valueOf(request.getRequestCode());
        String localAddress = IPUtils.getLocalAddress();
        String parseChannelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel());
        LogUtils.info(log, "cmd={}|{}|client2eventMesh|from={}|to={}", new Object[]{RequestCode.get(valueOf), EventMeshConstants.PROTOCOL_HTTP, parseChannelRemoteAddr, localAddress});
        SubscribeRequestHeader subscribeRequestHeader = (SubscribeRequestHeader) request.getHeader();
        SubscribeRequestBody body = request.getBody();
        EventMeshHTTPConfiguration eventMeshHttpConfiguration = this.eventMeshHTTPServer.getEventMeshHttpConfiguration();
        SubscribeResponseHeader buildHeader = SubscribeResponseHeader.buildHeader(valueOf, eventMeshHttpConfiguration.getEventMeshCluster(), localAddress, eventMeshHttpConfiguration.getEventMeshEnv(), eventMeshHttpConfiguration.getEventMeshIDC());
        if (StringUtils.isAnyBlank(new CharSequence[]{subscribeRequestHeader.getIdc(), subscribeRequestHeader.getPid(), subscribeRequestHeader.getSys()}) || !StringUtils.isNumeric(subscribeRequestHeader.getPid())) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SubscribeResponseBody.class);
            return;
        }
        if (StringUtils.isAnyBlank(new CharSequence[]{body.getUrl(), body.getConsumerGroup()}) || CollectionUtils.isEmpty(body.getTopics())) {
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SubscribeResponseBody.class);
            return;
        }
        List<SubscriptionItem> topics = body.getTopics();
        if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
            Iterator<SubscriptionItem> it = topics.iterator();
            while (it.hasNext()) {
                try {
                    this.acl.doAclCheckInHttpReceive(parseChannelRemoteAddr, subscribeRequestHeader.getUsername(), subscribeRequestHeader.getPasswd(), subscribeRequestHeader.getSys(), it.next().getTopic(), valueOf.intValue());
                } catch (Exception e) {
                    completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SubscribeResponseBody.class);
                    LogUtils.warn(log, "CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
                    return;
                }
            }
        }
        String url = body.getUrl();
        String consumerGroup = body.getConsumerGroup();
        try {
            if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(), eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) {
                log.error("subscriber url {} is not valid", url);
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url, SubscribeResponseBody.class);
                return;
            }
            if (!WebhookUtil.obtainDeliveryAgreement(this.eventMeshHTTPServer.getHttpClientPool().getClient(), url, eventMeshHttpConfiguration.getEventMeshWebhookOrigin())) {
                log.error("subscriber url {} is not allowed by the target system", url);
                completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " unauthorized webhook URL: " + url, SubscribeResponseBody.class);
                return;
            }
            SubscriptionManager subscriptionManager = this.eventMeshHTTPServer.getSubscriptionManager();
            synchronized (subscriptionManager.getLocalClientInfoMapping()) {
                ClientInfo clientInfo = getClientInfo(subscribeRequestHeader);
                subscriptionManager.registerClient(clientInfo, consumerGroup, topics, url);
                subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, topics);
                long currentTimeMillis = System.currentTimeMillis();
                HttpSummaryMetrics summaryMetrics = this.eventMeshHTTPServer.getMetrics().getSummaryMetrics();
                try {
                    this.eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, subscriptionManager.getLocalConsumerGroupMapping().get(consumerGroup));
                    asyncContext.onComplete(request.createHttpCommandResponse(EventMeshRetCode.SUCCESS), httpCommand -> {
                        try {
                            LogUtils.debug(log, "{}", httpCommand);
                            this.eventMeshHTTPServer.sendResponse(channelHandlerContext, httpCommand.httpResponse());
                            summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
                        } catch (Exception e2) {
                            log.error("onResponse error", e2);
                        }
                    });
                } catch (Exception e2) {
                    completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e2, 2), SubscribeResponseBody.class);
                    long currentTimeMillis2 = System.currentTimeMillis();
                    LogUtils.error(log, "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), JsonUtils.toJSONString(body.getTopics()), body.getUrl(), e2});
                    summaryMetrics.recordSendMsgFailed();
                    summaryMetrics.recordSendMsgCost(currentTimeMillis2 - currentTimeMillis);
                }
                this.eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
            }
        } catch (Exception e3) {
            LogUtils.error(log, "subscriber url:{} is invalid.", url, e3);
            completeResponse(request, asyncContext, buildHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url, SubscribeResponseBody.class);
        }
    }

    private ClientInfo getClientInfo(SubscribeRequestHeader subscribeRequestHeader) {
        ClientInfo clientInfo = new ClientInfo();
        clientInfo.setEnv(subscribeRequestHeader.getEnv());
        clientInfo.setIdc(subscribeRequestHeader.getIdc());
        clientInfo.setSys(subscribeRequestHeader.getSys());
        clientInfo.setIp(subscribeRequestHeader.getIp());
        clientInfo.setPid(subscribeRequestHeader.getPid());
        return clientInfo;
    }
}
