package com.xiaomi.mone.log.stream.config;

import com.alibaba.nacos.api.config.listener.Listener;
import com.google.gson.Gson;
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.model.MiLogStreamConfig;
import com.xiaomi.mone.log.model.MilogSpaceData;
import com.xiaomi.mone.log.stream.common.util.StreamUtils;
import com.xiaomi.mone.log.stream.exception.StreamException;
import com.xiaomi.youpin.docean.anno.Service;
import com.xiaomi.youpin.docean.common.StringUtils;
import com.xiaomi.youpin.docean.plugin.config.anno.Value;
import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
/* loaded from: input_file:com/xiaomi/mone/log/stream/config/ConfigManager.class */
public class ConfigManager {

    @Resource
    private NacosConfig nacosConfig;

    @Value("${hera.stream.monitor_space_data_id}")
    private String spaceDataId;
    private ConcurrentHashMap<Long, MilogConfigListener> listeners = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Long, MilogSpaceData> milogSpaceDataMap = new ConcurrentHashMap<>();
    private Gson gson = new Gson();
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConfigManager.class);
    private static ExecutorService THREAD_POOL = Executors.newVirtualThreadPerTaskExecutor();

    public void initStream() throws StreamException {
        MilogSpaceData milogSpaceData;
        log.debug("[initStream} nacos dataId:{},group:{}", this.spaceDataId, "DEFAULT_GROUP");
        String configStr = this.nacosConfig.getConfigStr(this.spaceDataId, "DEFAULT_GROUP", Constant.DEFAULT_TIME_OUT_MS.longValue());
        try {
            if (!StringUtils.isNotEmpty(configStr) || "null".equals(configStr)) {
                log.warn("[ConfigManager.initConfigManager] Nacos configuration [dataID:{},group:{}]not found,exit initConfigManager", this.spaceDataId, "DEFAULT_GROUP");
                return;
            }
            MiLogStreamConfig miLogStreamConfig = (MiLogStreamConfig) Constant.GSON.fromJson(configStr, MiLogStreamConfig.class);
            String currentMachineMark = StreamUtils.getCurrentMachineMark();
            Map<String, Map<Long, String>> config = miLogStreamConfig.getConfig();
            if (config.containsKey(currentMachineMark)) {
                Map<Long, String> map = config.get(currentMachineMark);
                log.info("[ConfigManager.initConfigManager] uniqueMark:{},data:{}", currentMachineMark, this.gson.toJson(map));
                for (Long l : map.keySet()) {
                    String str = map.get(l);
                    String configStr2 = this.nacosConfig.getConfigStr(str, "DEFAULT_GROUP", Constant.DEFAULT_TIME_OUT_MS.longValue());
                    if (StringUtils.isNotEmpty(configStr2) && null != (milogSpaceData = (MilogSpaceData) Constant.GSON.fromJson(configStr2, MilogSpaceData.class))) {
                        this.milogSpaceDataMap.put(l, milogSpaceData);
                    }
                    MilogSpaceData milogSpaceData2 = this.milogSpaceDataMap.get(l);
                    if (null != milogSpaceData2) {
                        addListener(l, new MilogConfigListener(l, str, "DEFAULT_GROUP", milogSpaceData2, this.nacosConfig));
                    }
                    log.info("[ConfigManager.initStream] added log config listener for spaceId:{},dataId:{}", l, str);
                }
            } else {
                log.info("server start current not contain space config,uniqueMark:{}", currentMachineMark);
            }
        } catch (Exception e) {
            log.error("[ConfigManager.initStream] initStream exec err", (Throwable) e);
        }
    }

    public void addListener(Long l, MilogConfigListener milogConfigListener) {
        this.listeners.put(l, milogConfigListener);
    }

    public void listenLogStreamConfig() {
        this.nacosConfig.addListener(this.spaceDataId, "DEFAULT_GROUP", new Listener() { // from class: com.xiaomi.mone.log.stream.config.ConfigManager.1
            @Override // com.alibaba.nacos.api.config.listener.Listener
            public Executor getExecutor() {
                return ConfigManager.THREAD_POOL;
            }

            @Override // com.alibaba.nacos.api.config.listener.Listener
            public void receiveConfigInfo(String str) {
                try {
                    ConfigManager.this.handleMiLogStreamConfig((MiLogStreamConfig) Constant.GSON.fromJson(str, MiLogStreamConfig.class));
                } catch (Exception e) {
                    ConfigManager.log.error("Error deserializing MiLogStreamConfig,spaceStr:{}", str, e);
                }
            }
        });
    }

    private void handleMiLogStreamConfig(MiLogStreamConfig miLogStreamConfig) {
        String currentMachineMark = StreamUtils.getCurrentMachineMark();
        log.info("Listening namespace received a configuration request,{},uniqueMark:{}", this.gson.toJson(miLogStreamConfig), currentMachineMark);
        if (miLogStreamConfig == null) {
            log.warn("listen dataID:{},groupId:{},but receive info is null", this.spaceDataId, "DEFAULT_GROUP");
            return;
        }
        Map<String, Map<Long, String>> config = miLogStreamConfig.getConfig();
        if (config != null) {
            processConfigForUniqueMark(currentMachineMark, config);
        } else {
            log.warn("listen dataID:{},groupId:{},but receive config is empty", this.spaceDataId, "DEFAULT_GROUP");
        }
    }

    private void processConfigForUniqueMark(String str, Map<String, Map<Long, String>> map) {
        if (map.containsKey(str)) {
            Map<Long, String> map2 = map.get(str);
            stopUnusefulListenerAndJob(map2);
            startNewListenerAndJob(map2);
        }
    }

    public boolean existListener(Long l) {
        return this.milogSpaceDataMap.get(l) != null;
    }

    public List<Long> unUseFilter(Map<Long, String> map) {
        ArrayList arrayList = new ArrayList(this.milogSpaceDataMap.keySet());
        Objects.requireNonNull(map);
        arrayList.removeIf((v1) -> {
            return r1.containsKey(v1);
        });
        return arrayList;
    }

    public void stopUnusefulListenerAndJob(Map<Long, String> map) {
        List<Long> unUseFilter = unUseFilter(map);
        if (CollectionUtils.isEmpty(unUseFilter)) {
            return;
        }
        logUnusefulSpaceIds(unUseFilter);
        unUseFilter.forEach(this::stopAndRemoveListenerAndJob);
    }

    private void logUnusefulSpaceIds(List<Long> list) {
        log.info("【Listening namespace】The space ID that needs to be stopped: {}", this.gson.toJson(list));
        logExistingListeners();
    }

    private void logExistingListeners() {
        log.info("[Listening namespace] All listeners already exist: {}", this.gson.toJson(new ArrayList(this.listeners.keySet())));
    }

    private void stopAndRemoveListenerAndJob(Long l) {
        MilogConfigListener milogConfigListener = this.listeners.get(l);
        if (milogConfigListener != null) {
            log.info("Stopping the space ID: {}", l);
            milogConfigListener.shutdown();
            log.info("Removing unUseSpaceId: {} log tail config listener", l);
            this.listeners.remove(l);
        } else {
            log.warn("No space ID in the current listener is ready to be stopped: {}", l);
        }
        stopJob(l, milogConfigListener);
    }

    private void stopJob(Long l, MilogConfigListener milogConfigListener) {
        MilogSpaceData milogSpaceData = this.milogSpaceDataMap.get(l);
        if (milogSpaceData != null) {
            milogConfigListener.getJobManager().closeJobs(milogSpaceData);
            log.info("Closing unUseSpaceId: {} logTail consumer job", l);
        } else {
            log.warn("Milog space config cache for spaceId: {}, unuseful job, needed to be closed, but is null", l);
        }
        this.milogSpaceDataMap.remove(l);
    }

    public void startNewListenerAndJob(Map<Long, String> map) {
        map.forEach((l, str) -> {
            if (existListener(l)) {
                return;
            }
            log.info("startNewListenerAndJob for spaceId:{}", l);
            MilogSpaceData milogSpaceData = getMilogSpaceData(str);
            this.milogSpaceDataMap.put(l, milogSpaceData);
            addListener(l, new MilogConfigListener(l, str, "DEFAULT_GROUP", milogSpaceData, this.nacosConfig));
        });
    }

    private MilogSpaceData getMilogSpaceData(String str) {
        try {
            String configStr = this.nacosConfig.getConfigStr(str, "DEFAULT_GROUP", Constant.DEFAULT_TIME_OUT_MS.longValue());
            return StringUtils.isNotEmpty(configStr) ? (MilogSpaceData) Constant.GSON.fromJson(configStr, MilogSpaceData.class) : new MilogSpaceData();
        } catch (Exception e) {
            log.error("Failed to get MilogSpaceData for dataId: {}", str, e);
            return new MilogSpaceData();
        }
    }

    public ConcurrentHashMap<Long, MilogSpaceData> getMilogSpaceDataMap() {
        return this.milogSpaceDataMap;
    }

    public ConcurrentHashMap<Long, MilogConfigListener> getListeners() {
        return this.listeners;
    }
}
