package org.apache.inlong.tubemq.manager.executors;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.inlong.tubemq.manager.entry.MasterEntry;
import org.apache.inlong.tubemq.manager.entry.TopicTaskEntry;
import org.apache.inlong.tubemq.manager.repository.MasterRepository;
import org.apache.inlong.tubemq.manager.repository.TopicTaskRepository;
import org.apache.inlong.tubemq.manager.service.interfaces.MasterService;
import org.apache.inlong.tubemq.manager.service.interfaces.NodeService;
import org.apache.inlong.tubemq.manager.service.interfaces.TopicService;
import org.apache.inlong.tubemq.manager.service.tube.TubeHttpBrokerInfoList;
import org.apache.inlong.tubemq.manager.service.tube.TubeHttpTopicInfoList;
import org.apache.inlong.tubemq.manager.utils.ValidateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/tubemq/manager/executors/AddTopicExecutor.class */
public class AddTopicExecutor {
    private static final Logger log = LoggerFactory.getLogger(AddTopicExecutor.class);
    public static final int MAX_CONFIG_TOPIC_NUM = 100;

    @Autowired
    NodeService nodeService;

    @Autowired
    MasterRepository masterRepository;

    @Value("${manager.max.configurable.broker.size:50}")
    private int maxConfigurableBrokerSize;

    @Value("${manager.max.config.topic.retry.time:50}")
    private int maxRetryTimes = 5000;

    @Autowired
    TopicTaskRepository topicTaskRepository;

    @Autowired
    TopicService topicService;

    @Autowired
    MasterService masterService;

    @Async("asyncExecutor")
    public void addTopicConfig(Long l, List<TopicTaskEntry> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        List partition = ListUtils.partition(list, 100);
        MasterEntry masterNode = this.masterService.getMasterNode(l);
        TubeHttpBrokerInfoList requestBrokerStatus = this.nodeService.requestBrokerStatus(masterNode);
        if (ValidateUtils.isNull(requestBrokerStatus)) {
            return;
        }
        Iterator it = partition.iterator();
        while (it.hasNext()) {
            doConfigTopics((Map) ((List) it.next()).stream().collect(Collectors.toMap((v0) -> {
                return v0.getTopicName();
            }, topicTaskEntry -> {
                return topicTaskEntry;
            }, (topicTaskEntry2, topicTaskEntry3) -> {
                return topicTaskEntry3;
            })), masterNode, requestBrokerStatus);
        }
    }

    private void doConfigTopics(Map<String, TopicTaskEntry> map, MasterEntry masterEntry, TubeHttpBrokerInfoList tubeHttpBrokerInfoList) {
        handleAddingTopic(masterEntry, tubeHttpBrokerInfoList, map);
        updateConfigResult(masterEntry, map);
    }

    private void updateConfigResult(MasterEntry masterEntry, Map<String, TopicTaskEntry> map) {
        TubeHttpBrokerInfoList requestBrokerStatus = this.nodeService.requestBrokerStatus(masterEntry);
        if (ValidateUtils.isNull(requestBrokerStatus)) {
            return;
        }
        for (String str : map.keySet()) {
            TubeHttpTopicInfoList requestTopicConfigInfo = this.topicService.requestTopicConfigInfo(masterEntry, str);
            if (!ValidateUtils.isNull(requestTopicConfigInfo)) {
                updateTopicRepo(new HashSet(requestTopicConfigInfo.getTopicBrokerIdList()), requestBrokerStatus.getAllBrokerIdList(), map.get(str));
            }
        }
    }

    private void updateTopicRepo(Set<Integer> set, List<Integer> list, TopicTaskEntry topicTaskEntry) {
        if (set.containsAll(list)) {
            return;
        }
        topicTaskEntry.setConfigRetryTimes(Integer.valueOf(topicTaskEntry.getConfigRetryTimes().intValue() + 1));
        this.topicTaskRepository.save(topicTaskEntry);
    }

    private void handleAddingTopic(MasterEntry masterEntry, TubeHttpBrokerInfoList tubeHttpBrokerInfoList, Map<String, TopicTaskEntry> map) {
        HashSet hashSet = new HashSet();
        for (String str : map.keySet()) {
            TubeHttpTopicInfoList requestTopicConfigInfo = this.topicService.requestTopicConfigInfo(masterEntry, str);
            if (ValidateUtils.isNull(requestTopicConfigInfo)) {
                return;
            }
            List<Integer> topicBrokerIdList = requestTopicConfigInfo.getTopicBrokerIdList();
            if (topicBrokerIdList.isEmpty()) {
                hashSet.add(str);
            } else {
                handleAddingExistTopics(masterEntry, tubeHttpBrokerInfoList, str, topicBrokerIdList);
            }
        }
        handleAddingNewTopics(masterEntry, tubeHttpBrokerInfoList, hashSet);
    }

    private void handleAddingExistTopics(MasterEntry masterEntry, TubeHttpBrokerInfoList tubeHttpBrokerInfoList, String str, List<Integer> list) {
        List<Integer> configurableBrokerIdList = tubeHttpBrokerInfoList.getConfigurableBrokerIdList();
        configurableBrokerIdList.removeAll(list);
        HashSet hashSet = new HashSet();
        hashSet.add(str);
        this.nodeService.configBrokersForTopics(masterEntry, hashSet, configurableBrokerIdList, Math.min(this.maxConfigurableBrokerSize, configurableBrokerIdList.size()));
    }

    private void handleAddingNewTopics(MasterEntry masterEntry, TubeHttpBrokerInfoList tubeHttpBrokerInfoList, Set<String> set) {
        if (CollectionUtils.isEmpty(set)) {
            return;
        }
        List<Integer> configurableBrokerIdList = tubeHttpBrokerInfoList.getConfigurableBrokerIdList();
        this.nodeService.configBrokersForTopics(masterEntry, set, configurableBrokerIdList, Math.min(this.maxConfigurableBrokerSize, configurableBrokerIdList.size()));
    }
}
