package org.apache.eventmesh.runtime.core.protocol.http.processor;

import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpRequest;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.HttpEventWrapper;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestURI;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.common.EventMeshTrace;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumer.ClientInfo;
import org.apache.eventmesh.runtime.core.consumer.SubscriptionManager;
import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.AbstractEventProcessor;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EventMeshTrace
/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/processor/LocalSubscribeEventProcessor.class */
public class LocalSubscribeEventProcessor extends AbstractEventProcessor {
    private static final Logger log = LoggerFactory.getLogger(LocalSubscribeEventProcessor.class);
    private final Acl acl;

    public LocalSubscribeEventProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
        super(eventMeshHTTPServer);
        this.acl = eventMeshHTTPServer.getAcl();
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.AsyncHttpProcessor
    public void handler(HandlerService.HandlerSpecific handlerSpecific, HttpRequest httpRequest) throws Exception {
        Channel channel = handlerSpecific.getCtx().channel();
        HttpEventWrapper request = handlerSpecific.getAsyncContext().getRequest();
        String localAddress = IPUtils.getLocalAddress();
        String parseChannelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
        if (log.isInfoEnabled()) {
            log.info("uri={}|{}|client2eventMesh|from={}|to={}", new Object[]{request.getRequestURI(), EventMeshConstants.PROTOCOL_HTTP, parseChannelRemoteAddr, localAddress});
        }
        request.getHeaderMap().put(EventMeshConstants.MANAGE_IP, parseChannelRemoteAddr);
        request.buildSysHeaderForClient();
        Map<String, Object> builderResponseHeaderMap = builderResponseHeaderMap(request);
        Map<String, Object> sysHeaderMap = request.getSysHeaderMap();
        HashMap hashMap = new HashMap();
        if (validateSysHeader(sysHeaderMap)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, builderResponseHeaderMap, hashMap, null);
            return;
        }
        Map<String, Object> map = (Map) Optional.ofNullable(JsonUtils.parseTypeReferenceObject(new String(request.getBody(), Constants.DEFAULT_CHARSET), new TypeReference<HashMap<String, Object>>() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor.1
        })).orElseGet(HashMap::new);
        if (validatedRequestBodyMap(map)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, builderResponseHeaderMap, hashMap, null);
            return;
        }
        String obj = map.get(EventMeshConstants.URL).toString();
        String obj2 = map.get(EventMeshConstants.CONSUMER_GROUP).toString();
        List<SubscriptionItem> list = (List) Optional.ofNullable(JsonUtils.parseTypeReferenceObject(JsonUtils.toJSONString(map.get(EventMeshConstants.MANAGE_TOPIC)), new TypeReference<List<SubscriptionItem>>() { // from class: org.apache.eventmesh.runtime.core.protocol.http.processor.LocalSubscribeEventProcessor.2
        })).orElseGet(Collections::emptyList);
        if (this.eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
            Iterator<SubscriptionItem> it = list.iterator();
            while (it.hasNext()) {
                try {
                    this.acl.doAclCheckInHttpReceive(parseChannelRemoteAddr, sysHeaderMap.get("username").toString(), sysHeaderMap.get("passwd").toString(), sysHeaderMap.get("sys").toString(), it.next().getTopic(), request.getRequestURI());
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
                    }
                    handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, builderResponseHeaderMap, hashMap, null);
                    return;
                }
            }
        }
        try {
            if (!IPUtils.isValidDomainOrIp(obj, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIpv4BlackList(), this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIpv6BlackList())) {
                if (log.isErrorEnabled()) {
                    log.error("subscriber url {} is not valid", obj);
                }
                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, builderResponseHeaderMap, hashMap, null);
                return;
            }
            if (!WebhookUtil.obtainDeliveryAgreement(this.eventMeshHTTPServer.getHttpClientPool().getClient(), obj, this.eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshWebhookOrigin())) {
                if (log.isErrorEnabled()) {
                    log.error("subscriber url {} is not allowed by the target system", obj);
                }
                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, builderResponseHeaderMap, hashMap, null);
                return;
            }
            synchronized (this.eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
                ClientInfo clientInfo = getClientInfo(request);
                SubscriptionManager subscriptionManager = this.eventMeshHTTPServer.getSubscriptionManager();
                subscriptionManager.registerClient(clientInfo, obj2, list, obj);
                subscriptionManager.updateSubscription(clientInfo, obj2, obj, list);
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    this.eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(obj2, this.eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(obj2));
                    hashMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
                    hashMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());
                    handlerSpecific.sendResponse(builderResponseHeaderMap, hashMap);
                } catch (Exception e2) {
                    if (log.isErrorEnabled()) {
                        log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), JsonUtils.toJSONString(list), obj, e2});
                    }
                    handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR, builderResponseHeaderMap, hashMap, null);
                }
                updateMetadata();
            }
        } catch (Exception e3) {
            if (log.isErrorEnabled()) {
                log.error("subscriber url {} is not valid", obj, e3);
            }
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, builderResponseHeaderMap, hashMap, null);
        }
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.processor.HttpProcessor
    public String[] paths() {
        return new String[]{RequestURI.SUBSCRIBE_LOCAL.getRequestURI()};
    }

    private ClientInfo getClientInfo(HttpEventWrapper httpEventWrapper) {
        Map sysHeaderMap = httpEventWrapper.getSysHeaderMap();
        ClientInfo clientInfo = new ClientInfo();
        clientInfo.setEnv(sysHeaderMap.get("env").toString());
        clientInfo.setIdc(sysHeaderMap.get("idc").toString());
        clientInfo.setSys(sysHeaderMap.get("sys").toString());
        clientInfo.setIp(sysHeaderMap.get(EventMeshConstants.MANAGE_IP).toString());
        clientInfo.setPid(sysHeaderMap.get("pid").toString());
        return clientInfo;
    }
}
