package com.infusers.core.sse;

import com.infusers.core.eng.selfheal.insights.spring.pom.dto.POMInsightDTO;
import com.infusers.core.rabbitmq.impl.IRabbitMQConsumerService;
import com.infusers.core.sse.requests.ActiveRequestsCountRequest;
import com.infusers.core.sse.requests.RequestTrackingService;
import com.infusers.core.stats.users.ActiveUserCountEvent;
import com.infusers.core.util.InfusersUtility;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
/* loaded from: input_file:com/infusers/core/sse/BroadCastSSEService.class */
public class BroadCastSSEService<T> implements IRabbitMQConsumerService<ActiveRequestsCountRequest> {
    private static final String CLASS_NAME = "BroadCastSSEService";
    private static final String ACTIVE_REQUESTS = "activeRequests";
    private static final String ACTIVE_USERS = "activeUsers";

    @Autowired(required = true)
    private RequestTrackingService requestTrackingService;

    @Autowired(required = true)
    private InfusersUtility utility;

    @Value("${spring.application.name}")
    private String eventSourceApp;
    private final Logger log = LogManager.getLogger(BroadCastSSEService.class);
    private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();

    public long getActiveRequestCount() {
        return this.requestTrackingService.getActiveRequestCount();
    }

    public SseEmitter streamActiveRequests(HttpServletResponse httpServletResponse) {
        this.log.debug("BroadCastSSEService.streamActiveRequests() -> Called.");
        try {
            httpServletResponse.setContentType("text/event-stream");
            httpServletResponse.setCharacterEncoding("UTF-8");
            SseEmitter sseEmitter = new SseEmitter(this.utility.getMaxTimeout());
            this.emitters.add(sseEmitter);
            sseEmitter.onCompletion(() -> {
                this.emitters.remove(sseEmitter);
            });
            sseEmitter.onTimeout(() -> {
                this.emitters.remove(sseEmitter);
            });
            sseEmitter.onError(th -> {
                this.log.error("BroadCastSSEService.streamActiveRequests() -->onError");
                this.emitters.remove(sseEmitter);
            });
            return sseEmitter;
        } catch (Exception e) {
            this.log.error("BroadCastSSEService.streamActiveRequests() -> e: ", e);
            throw e;
        }
    }

    @Scheduled(fixedRate = 60000)
    private void cleanupEmitters() {
        this.log.warn("BroadCastSSEService.cleanupEmitters() -> Started, emitters count = " + this.emitters.size());
        this.emitters.removeIf(sseEmitter -> {
            try {
                sseEmitter.send(POMInsightDTO.NO_VERSION);
                return false;
            } catch (IOException e) {
                return true;
            }
        });
        this.log.warn("BroadCastSSEService.cleanupEmitters() -> Ended, emitters count = " + this.emitters.size());
    }

    @Override // com.infusers.core.rabbitmq.impl.IRabbitMQConsumerService
    public void process(ActiveRequestsCountRequest activeRequestsCountRequest) {
        broadcast(activeRequestsCountRequest.getEventSource(), activeRequestsCountRequest.getCount(), ACTIVE_REQUESTS);
    }

    @EventListener
    public void handleActiveUserCountEvent(ActiveUserCountEvent activeUserCountEvent) {
        long count = activeUserCountEvent.getCount();
        this.log.warn("BroadCastSSEService.handleActiveUserCountEvent() -> active users count = " + count);
        broadcast(this.eventSourceApp, count, ACTIVE_USERS);
    }

    private void broadcast(String str, long j, String str2) {
        Logger logger = this.log;
        this.emitters.size();
        logger.debug("BroadCastSSEService.broadcast() -> count = " + j + " :: Source " + logger + " :: key = " + str + " :: emitters count = " + str2);
        ArrayList arrayList = new ArrayList();
        this.emitters.forEach(sseEmitter -> {
            try {
                sseEmitter.send(SseEmitter.event().name(str2).data(Long.valueOf(j)));
            } catch (Exception e) {
                this.log.error("BroadCastSSEService.broadcast() -> Error sending event to emitter: ", e);
                sseEmitter.completeWithError(e);
                arrayList.add(sseEmitter);
            }
        });
        this.emitters.removeAll(arrayList);
        arrayList.clear();
        this.log.debug("BroadCastSSEService.broadcast() -> # of emitters removed = " + arrayList.size() + " :: total active emitters = " + this.emitters.size());
    }
}
