package org.apache.eventmesh.runtime.core.protocol.grpc.processor;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.eventmesh.api.exception.AclException;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshGrpcServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.EventMeshConsumer;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.ConsumerGroupClient;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/processor/SubscribeStreamProcessor.class */
public class SubscribeStreamProcessor {
    private static final Logger log = LoggerFactory.getLogger(SubscribeStreamProcessor.class);
    private final EventMeshGrpcServer eventMeshGrpcServer;
    private final Acl acl;
    private final Logger aclLogger = LoggerFactory.getLogger(EventMeshConstants.ACL);
    private final GrpcType grpcType = GrpcType.STREAM;

    public SubscribeStreamProcessor(EventMeshGrpcServer eventMeshGrpcServer) {
        this.eventMeshGrpcServer = eventMeshGrpcServer;
        this.acl = eventMeshGrpcServer.getAcl();
    }

    public void process(CloudEvent cloudEvent, EventEmitter<CloudEvent> eventEmitter) throws Exception {
        if (!ServiceUtils.validateCloudEventAttributes(cloudEvent)) {
            ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_HEADER_ERR, eventEmitter);
            return;
        }
        if (!ServiceUtils.validateSubscription(this.grpcType, cloudEvent)) {
            ServiceUtils.sendResponseCompleted(StatusCode.EVENTMESH_PROTOCOL_BODY_ERR, eventEmitter);
            return;
        }
        try {
            doAclCheck(cloudEvent);
            ConsumerManager consumerManager = this.eventMeshGrpcServer.getConsumerManager();
            String consumerGroup = EventMeshCloudEventUtils.getConsumerGroup(cloudEvent);
            String env = EventMeshCloudEventUtils.getEnv(cloudEvent);
            String idc = EventMeshCloudEventUtils.getIdc(cloudEvent);
            String sys = EventMeshCloudEventUtils.getSys(cloudEvent);
            String ip = EventMeshCloudEventUtils.getIp(cloudEvent);
            String pid = EventMeshCloudEventUtils.getPid(cloudEvent);
            LinkedList linkedList = new LinkedList();
            for (SubscriptionItem subscriptionItem : (List) JsonUtils.parseTypeReferenceObject(cloudEvent.getTextData(), new TypeReference<List<SubscriptionItem>>() { // from class: org.apache.eventmesh.runtime.core.protocol.grpc.processor.SubscribeStreamProcessor.1
            })) {
                linkedList.add(ConsumerGroupClient.builder().env(env).idc(idc).sys(sys).ip(ip).pid(pid).consumerGroup(consumerGroup).topic(subscriptionItem.getTopic()).subscriptionMode(subscriptionItem.getMode()).grpcType(this.grpcType).eventEmitter(eventEmitter).lastUpTime(new Date()).build());
            }
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                consumerManager.registerClient((ConsumerGroupClient) it.next());
            }
            EventMeshConsumer eventMeshConsumer = consumerManager.getEventMeshConsumer(consumerGroup);
            boolean z = false;
            Iterator it2 = linkedList.iterator();
            while (it2.hasNext()) {
                if (eventMeshConsumer.registerClient((ConsumerGroupClient) it2.next())) {
                    z = true;
                }
            }
            if (z) {
                log.info("ConsumerGroup {} topic info changed, restart EventMesh Consumer", consumerGroup);
                consumerManager.restartEventMeshConsumer(consumerGroup);
            } else {
                log.warn("EventMesh consumer [{}] didn't restart.", consumerGroup);
            }
            ServiceUtils.sendStreamResponse(cloudEvent, StatusCode.SUCCESS, "subscribe success", eventEmitter);
        } catch (AclException e) {
            this.aclLogger.warn("CLIENT HAS NO PERMISSION to Subscribe. failed", e);
            ServiceUtils.sendStreamResponseCompleted(cloudEvent, StatusCode.EVENTMESH_ACL_ERR, e.getMessage(), eventEmitter);
        }
    }

    private void doAclCheck(CloudEvent cloudEvent) throws AclException {
        if (this.eventMeshGrpcServer.getEventMeshGrpcConfiguration().isEventMeshServerSecurityEnable()) {
            String ip = EventMeshCloudEventUtils.getIp(cloudEvent);
            String userName = EventMeshCloudEventUtils.getUserName(cloudEvent);
            String password = EventMeshCloudEventUtils.getPassword(cloudEvent);
            String sys = EventMeshCloudEventUtils.getSys(cloudEvent);
            Iterator it = ((List) JsonUtils.parseTypeReferenceObject(cloudEvent.getTextData(), new TypeReference<List<SubscriptionItem>>() { // from class: org.apache.eventmesh.runtime.core.protocol.grpc.processor.SubscribeStreamProcessor.2
            })).iterator();
            while (it.hasNext()) {
                this.acl.doAclCheckInHttpReceive(ip, userName, password, sys, ((SubscriptionItem) it.next()).getTopic(), RequestCode.SUBSCRIBE.getRequestCode().intValue());
            }
        }
    }
}
