package org.apache.inlong.dataproxy.sink.mq;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.class */
public class MessageQueueZoneProducer {
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneProducer.class);
    private static final long MAX_RESERVED_TIME = 60000;
    private final MessageQueueZoneSink zoneSink;
    private final MessageQueueZoneSinkContext context;
    private final CacheClusterSelector cacheClusterSelector;
    private final AtomicInteger clusterIndex = new AtomicInteger(0);
    private List<String> currentClusterNames = new ArrayList();
    private final ConcurrentHashMap<String, Long> usingTimeMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, MessageQueueClusterProducer> usingClusterMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, MessageQueueClusterProducer> deletingClusterMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Long> deletingTimeMap = new ConcurrentHashMap<>();
    private final Set<String> lastRefreshTopics = new HashSet();

    public MessageQueueZoneProducer(MessageQueueZoneSink messageQueueZoneSink, MessageQueueZoneSinkContext messageQueueZoneSinkContext) {
        this.zoneSink = messageQueueZoneSink;
        this.context = messageQueueZoneSinkContext;
        this.cacheClusterSelector = messageQueueZoneSinkContext.createCacheClusterSelector();
    }

    public void start() {
        try {
            logger.info("{} start MessageQueueZoneProducer", this.zoneSink.getName());
            reloadMetaConfig();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    public void close() {
        for (MessageQueueClusterProducer messageQueueClusterProducer : this.deletingClusterMap.values()) {
            if (messageQueueClusterProducer != null) {
                messageQueueClusterProducer.stop();
            }
        }
        for (MessageQueueClusterProducer messageQueueClusterProducer2 : this.usingClusterMap.values()) {
            if (messageQueueClusterProducer2 != null) {
                messageQueueClusterProducer2.stop();
            }
        }
        this.deletingClusterMap.clear();
        this.deletingTimeMap.clear();
        this.usingClusterMap.clear();
        this.usingTimeMap.clear();
    }

    public void reloadMetaConfig() {
        checkAndReloadClusterInfo();
        checkAndPublishTopics();
    }

    public void clearExpiredProducers() {
        if (this.deletingClusterMap.isEmpty()) {
            return;
        }
        HashSet<String> hashSet = new HashSet();
        synchronized (this.deletingClusterMap) {
            long currentTimeMillis = System.currentTimeMillis();
            for (Map.Entry<String, Long> entry : this.deletingTimeMap.entrySet()) {
                if (entry != null && entry.getKey() != null && entry.getValue() != null && currentTimeMillis - entry.getValue().longValue() >= 60000) {
                    hashSet.add(entry.getKey());
                }
            }
            if (hashSet.isEmpty()) {
                return;
            }
            for (String str : hashSet) {
                this.deletingTimeMap.remove(str);
                MessageQueueClusterProducer remove = this.deletingClusterMap.remove(str);
                if (remove != null) {
                    remove.stop();
                }
            }
            logger.info("{} cleared expired cluster producer {}", this.zoneSink.getName(), hashSet);
        }
    }

    public boolean send(PackProfile packProfile) {
        while (true) {
            List<String> list = this.currentClusterNames;
            if (list == null || list.isEmpty()) {
                this.context.fileMetricIncSumStats(StatConstants.EVENT_SINK_CLUSTER_EMPTY);
                sleepSomeTime(100L);
            } else {
                String str = list.get(Math.abs(this.clusterIndex.getAndIncrement()) % list.size());
                if (str == null) {
                    this.context.fileMetricIncSumStats(StatConstants.EVENT_SINK_CLUSTER_UNMATCHED);
                    sleepSomeTime(100L);
                } else {
                    MessageQueueClusterProducer messageQueueClusterProducer = this.usingClusterMap.get(str);
                    if (messageQueueClusterProducer != null) {
                        return messageQueueClusterProducer.send(packProfile);
                    }
                    this.context.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_CPRODUCER_NULL, str);
                    sleepSomeTime(100L);
                }
            }
        }
    }

    private void checkAndReloadClusterInfo() {
        try {
            List<CacheClusterConfig> select = this.cacheClusterSelector.select(ConfigManager.getInstance().getCachedCLusterConfig());
            if (select == null || select.size() == 0) {
                return;
            }
            boolean z = false;
            ArrayList arrayList = new ArrayList();
            ArrayList<CacheClusterConfig> arrayList2 = new ArrayList();
            synchronized (this.deletingClusterMap) {
                for (CacheClusterConfig cacheClusterConfig : select) {
                    if (cacheClusterConfig != null) {
                        if (this.usingTimeMap.containsKey(cacheClusterConfig.getClusterName())) {
                            arrayList.add(cacheClusterConfig.getClusterName());
                        } else if (this.deletingTimeMap.containsKey(cacheClusterConfig.getClusterName())) {
                            this.deletingTimeMap.remove(cacheClusterConfig.getClusterName());
                            MessageQueueClusterProducer remove = this.deletingClusterMap.remove(cacheClusterConfig.getClusterName());
                            if (remove == null) {
                                arrayList2.add(cacheClusterConfig);
                            } else {
                                this.usingClusterMap.put(cacheClusterConfig.getClusterName(), remove);
                                this.usingTimeMap.put(cacheClusterConfig.getClusterName(), Long.valueOf(System.currentTimeMillis()));
                                arrayList.add(cacheClusterConfig.getClusterName());
                            }
                        } else {
                            arrayList2.add(cacheClusterConfig);
                        }
                    }
                }
            }
            if (!arrayList2.isEmpty()) {
                z = true;
                long currentTimeMillis = System.currentTimeMillis();
                for (CacheClusterConfig cacheClusterConfig2 : arrayList2) {
                    if (cacheClusterConfig2 != null) {
                        MessageQueueClusterProducer messageQueueClusterProducer = new MessageQueueClusterProducer(this.zoneSink.getName(), cacheClusterConfig2, this.context);
                        messageQueueClusterProducer.start();
                        this.usingClusterMap.put(cacheClusterConfig2.getClusterName(), messageQueueClusterProducer);
                        this.usingTimeMap.put(cacheClusterConfig2.getClusterName(), Long.valueOf(currentTimeMillis));
                        arrayList.add(cacheClusterConfig2.getClusterName());
                    }
                }
            }
            if (!arrayList.equals(this.currentClusterNames)) {
                this.currentClusterNames = arrayList;
                z = true;
            }
            HashSet<String> hashSet = new HashSet();
            synchronized (this.deletingClusterMap) {
                for (Map.Entry<String, MessageQueueClusterProducer> entry : this.usingClusterMap.entrySet()) {
                    if (entry != null && entry.getKey() != null && entry.getValue() != null && !arrayList.contains(entry.getKey())) {
                        hashSet.add(entry.getKey());
                    }
                }
                if (!hashSet.isEmpty()) {
                    z = true;
                    long currentTimeMillis2 = System.currentTimeMillis();
                    for (String str : hashSet) {
                        MessageQueueClusterProducer remove2 = this.usingClusterMap.remove(str);
                        this.usingTimeMap.remove(str);
                        if (remove2 != null) {
                            this.deletingClusterMap.put(str, remove2);
                            this.deletingTimeMap.put(str, Long.valueOf(currentTimeMillis2));
                        }
                    }
                }
            }
            if (z) {
                if (this.zoneSink.isMqClusterStarted()) {
                    logger.info("{} reload cluster info, current cluster are {}, removed {}, created {}", new Object[]{this.zoneSink.getName(), arrayList, hashSet, arrayList2});
                } else {
                    this.zoneSink.setMQClusterStarted();
                    ConfigManager.getInstance().setMqClusterReady();
                    logger.info("{} reload cluster info, and updated sink status, current cluster are {}, removed {}, created {}", new Object[]{this.zoneSink.getName(), arrayList, hashSet, arrayList2});
                }
            }
        } catch (Throwable th) {
            logger.error("{} reload cluster info failure", this.zoneSink.getName(), th);
        }
    }

    private void checkAndPublishTopics() {
        Set<String> allTopicNames = ConfigManager.getInstance().getAllTopicNames();
        if (allTopicNames.isEmpty() || this.lastRefreshTopics.equals(allTopicNames)) {
            return;
        }
        logger.info("{} reload topics changed, current topics are {}, last topics are {}", new Object[]{this.zoneSink.getName(), allTopicNames, this.lastRefreshTopics});
        this.lastRefreshTopics.addAll(allTopicNames);
        for (MessageQueueClusterProducer messageQueueClusterProducer : this.usingClusterMap.values()) {
            if (messageQueueClusterProducer != null) {
                messageQueueClusterProducer.publishTopic(allTopicNames);
            }
        }
    }

    private void sleepSomeTime(long j) {
        try {
            Thread.sleep(j);
        } catch (Throwable th) {
        }
    }
}
