package org.apache.eventmesh.runtime.core.protocol.http.push;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.cloudevents.SpecVersion;
import io.opentelemetry.api.trace.Span;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.EventMeshConsumer;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/http/push/HTTPMessageHandler.class */
public class HTTPMessageHandler implements MessageHandler {
    private final transient EventMeshConsumer eventMeshConsumer;
    private final transient ThreadPoolExecutor pushExecutor;
    private static final Logger log = LoggerFactory.getLogger(HTTPMessageHandler.class);
    private static final transient ScheduledExecutorService SCHEDULER = ThreadPoolFactory.createSingleScheduledExecutor("eventMesh-pushMsgTimeout");
    private static final Integer CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD = 10000;
    protected static final transient Map<String, Set<AbstractHTTPPushRequest>> waitingRequests = Maps.newConcurrentMap();

    private void checkTimeout() {
        waitingRequests.forEach((str, set) -> {
            set.forEach(abstractHTTPPushRequest -> {
                abstractHTTPPushRequest.timeout();
                waitingRequests.get(abstractHTTPPushRequest.handleMsgContext.getConsumerGroup()).remove(abstractHTTPPushRequest);
            });
        });
    }

    public HTTPMessageHandler(EventMeshConsumer eventMeshConsumer) {
        this.eventMeshConsumer = eventMeshConsumer;
        this.pushExecutor = eventMeshConsumer.getEventMeshHTTPServer().getPushMsgExecutor();
        waitingRequests.put(this.eventMeshConsumer.getConsumerGroupConf().getConsumerGroup(), Sets.newConcurrentHashSet());
        SCHEDULER.scheduleAtFixedRate(this::checkTimeout, 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.http.push.MessageHandler
    public boolean handle(HandleMsgContext handleMsgContext) {
        if (((Set) MapUtils.getObject(waitingRequests, handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet())).size() > CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD.intValue()) {
            log.warn("waitingRequests is too many, so reject, this message will be send back to MQ, consumerGroup:{}, threshold:{}", handleMsgContext.getConsumerGroup(), CONSUMER_GROUP_WAITING_REQUEST_THRESHOLD);
            return false;
        }
        try {
            this.pushExecutor.submit(() -> {
                Span prepareClientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(((SpecVersion) Objects.requireNonNull(handleMsgContext.getEvent().getSpecVersion())).toString(), handleMsgContext.getEvent()), "downstream-eventmesh-client-span", false);
                try {
                    new AsyncHTTPPushRequest(handleMsgContext, waitingRequests).tryHTTPRequest();
                } finally {
                    TraceUtils.finishSpan(prepareClientSpan, handleMsgContext.getEvent());
                }
            });
            return true;
        } catch (RejectedExecutionException e) {
            log.warn("pushMsgThreadPoolQueue is full, so reject, current task size {}", Integer.valueOf(handleMsgContext.getEventMeshHTTPServer().getPushMsgExecutor().getQueue().size()), e);
            return false;
        }
    }
}
