package org.apache.inlong.dataproxy.sink.mq;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.class */
public class MessageQueueZoneProducer {
    public static final Logger LOG = LoggerFactory.getLogger(MessageQueueZoneProducer.class);
    public static final int MAX_INDEX = 1073741823;
    private final String workerName;
    private final MessageQueueZoneSinkContext context;
    private Timer reloadTimer;
    private List<MessageQueueClusterProducer> clusterList = new ArrayList();
    private List<MessageQueueClusterProducer> deletingClusterList = new ArrayList();
    private AtomicInteger clusterIndex = new AtomicInteger(0);

    public MessageQueueZoneProducer(String str, MessageQueueZoneSinkContext messageQueueZoneSinkContext) {
        this.workerName = str;
        this.context = messageQueueZoneSinkContext;
    }

    public void start() {
        try {
            LOG.info("start MessageQueueZoneProducer:{}", this.workerName);
            reload();
            setReloadTimer();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void close() {
        try {
            this.reloadTimer.cancel();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
        Iterator<MessageQueueClusterProducer> it = this.clusterList.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    private void setReloadTimer() {
        this.reloadTimer = new Timer(true);
        this.reloadTimer.schedule(new TimerTask() { // from class: org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneProducer.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                MessageQueueZoneProducer.this.reload();
            }
        }, new Date(System.currentTimeMillis() + this.context.getReloadInterval()), this.context.getReloadInterval());
    }

    public void reload() {
        try {
            this.deletingClusterList.forEach((v0) -> {
                v0.stop();
            });
            this.deletingClusterList.clear();
            List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
            ArrayList arrayList = new ArrayList(configList.size());
            HashSet hashSet = new HashSet();
            configList.forEach(cacheClusterConfig -> {
                hashSet.add(cacheClusterConfig.getClusterName());
            });
            HashSet hashSet2 = new HashSet();
            this.clusterList.forEach(messageQueueClusterProducer -> {
                hashSet2.add(messageQueueClusterProducer.getCacheClusterName());
            });
            for (CacheClusterConfig cacheClusterConfig2 : configList) {
                if (!hashSet2.contains(cacheClusterConfig2.getClusterName())) {
                    MessageQueueClusterProducer messageQueueClusterProducer2 = new MessageQueueClusterProducer(this.workerName, cacheClusterConfig2, this.context);
                    messageQueueClusterProducer2.start();
                    arrayList.add(messageQueueClusterProducer2);
                }
            }
            for (MessageQueueClusterProducer messageQueueClusterProducer3 : this.clusterList) {
                if (hashSet.contains(messageQueueClusterProducer3.getCacheClusterName())) {
                    arrayList.add(messageQueueClusterProducer3);
                } else {
                    this.deletingClusterList.add(messageQueueClusterProducer3);
                }
            }
            this.clusterList = arrayList;
            if (!ConfigManager.getInstance().isMqClusterReady()) {
                LOG.info("set mq cluster status ready");
                ConfigManager.getInstance().updMqClusterStatus(true);
            }
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    public boolean send(BatchPackProfile batchPackProfile) {
        int andIncrement = this.clusterIndex.getAndIncrement();
        if (andIncrement > 1073741823) {
            this.clusterIndex.set(0);
        }
        List<MessageQueueClusterProducer> list = this.clusterList;
        return list.get(andIncrement % list.size()).send(batchPackProfile);
    }
}
