package com.infusers.core.stats.requests;

import com.infusers.core.rabbitmq.impl.IRabbitMQConsumerService;
import com.infusers.core.util.InfusersUtility;
import jakarta.servlet.http.HttpServletResponse;
import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
/* loaded from: input_file:com/infusers/core/stats/requests/BroadCastRequestStatsService.class */
public class BroadCastRequestStatsService<T> implements IRabbitMQConsumerService<ActiveRequestsCountRequest> {
    private static final String CLASS_NAME = "BroadCastRequestStatsService";

    @Autowired(required = true)
    private RequestTrackingService requestTrackingService;

    @Autowired(required = true)
    private InfusersUtility utility;
    private final Logger log = LogManager.getLogger(BroadCastRequestStatsService.class);
    private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();

    public long getActiveRequestCount() {
        return this.requestTrackingService.getActiveRequestCount();
    }

    public SseEmitter streamActiveRequests(HttpServletResponse httpServletResponse) {
        this.log.debug("BroadCastRequestStatsService.streamActiveRequests() -> Called.");
        try {
            httpServletResponse.setContentType("text/event-stream");
            httpServletResponse.setCharacterEncoding("UTF-8");
            SseEmitter sseEmitter = new SseEmitter(this.utility.getMaxTimeout());
            addEmitter(sseEmitter);
            sseEmitter.onCompletion(() -> {
                removeEmitter(sseEmitter);
            });
            sseEmitter.onTimeout(() -> {
                removeEmitter(sseEmitter);
            });
            sseEmitter.onError(th -> {
                this.log.error("BroadCastRequestStatsService.streamActiveRequests() -->onError");
            });
            return sseEmitter;
        } catch (Exception e) {
            this.log.error("BroadCastRequestStatsService.streamActiveRequests() -> e: ", e);
            return null;
        }
    }

    private void addEmitter(SseEmitter sseEmitter) {
        synchronized (this.emitters) {
            this.emitters.add(sseEmitter);
        }
    }

    private void removeEmitter(SseEmitter sseEmitter) {
        synchronized (this.emitters) {
            this.emitters.remove(sseEmitter);
        }
    }

    @Override // com.infusers.core.rabbitmq.impl.IRabbitMQConsumerService
    public void process(ActiveRequestsCountRequest activeRequestsCountRequest) {
        Logger logger = this.log;
        long count = activeRequestsCountRequest.getCount();
        String eventSource = activeRequestsCountRequest.getEventSource();
        this.emitters.size();
        logger.debug("BroadCastRequestStatsService.process() -> count = " + count + " :: Source " + logger + " :: emitters count = " + eventSource);
        synchronized (this.emitters) {
            ArrayList arrayList = new ArrayList();
            this.emitters.forEach(sseEmitter -> {
                try {
                    sseEmitter.send(SseEmitter.event().name("activeRequests").data(Long.valueOf(activeRequestsCountRequest.getCount())));
                } catch (Exception e) {
                    this.log.error("BroadCastRequestStatsService.broadcast() -> Error sending event to emitter: ", e);
                    sseEmitter.completeWithError(e);
                    arrayList.add(sseEmitter);
                }
            });
            this.emitters.removeAll(arrayList);
            this.log.debug("BroadCastRequestStatsService.process() -> # of emitters removed = " + arrayList.size() + " :: total active emitters = " + this.emitters.size());
        }
    }
}
