package org.apache.eventmesh.runtime.core.protocol.grpc.push;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.GrpcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/push/MessageHandler.class */
public class MessageHandler {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final ThreadPoolExecutor pushExecutor;
    private static final ScheduledExecutorService SCHEDULER = ThreadPoolFactory.createSingleScheduledExecutor("eventMesh-pushMsgTimeout-");
    private static final Integer CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD = 10000;
    private static final Map<String, Set<AbstractPushRequest>> waitingRequests = Maps.newConcurrentMap();

    public MessageHandler(String str, ThreadPoolExecutor threadPoolExecutor) {
        this.pushExecutor = threadPoolExecutor;
        waitingRequests.put(str, Sets.newConcurrentHashSet());
        SCHEDULER.scheduleAtFixedRate(this::checkTimeout, 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    private void checkTimeout() {
        waitingRequests.forEach((str, set) -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                AbstractPushRequest abstractPushRequest = (AbstractPushRequest) it.next();
                abstractPushRequest.timeout();
                waitingRequests.get(abstractPushRequest.getHandleMsgContext().getConsumerGroup()).remove(abstractPushRequest);
            }
        });
    }

    public boolean handle(HandleMsgContext handleMsgContext) {
        if (((Set) MapUtils.getObject(waitingRequests, handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet())).size() > CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD.intValue()) {
            this.logger.warn("waitingRequests is too many, so reject, this message will be send back to MQ, consumerGroup:{}, threshold:{}", handleMsgContext.getConsumerGroup(), CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD);
            return false;
        }
        try {
            this.pushExecutor.submit(() -> {
                createHttpPushRequest(handleMsgContext).tryPushRequest();
            });
            return true;
        } catch (RejectedExecutionException e) {
            return false;
        }
    }

    private AbstractPushRequest createHttpPushRequest(HandleMsgContext handleMsgContext) {
        return GrpcType.WEBHOOK.equals(handleMsgContext.getGrpcType()) ? new WebhookPushRequest(handleMsgContext, waitingRequests) : new StreamPushRequest(handleMsgContext, waitingRequests);
    }
}
