package org.apache.inlong.tubemq.manager.service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.transaction.Transactional;
import org.apache.inlong.tubemq.manager.controller.TubeMQResult;
import org.apache.inlong.tubemq.manager.entry.ClusterEntry;
import org.apache.inlong.tubemq.manager.entry.MasterEntry;
import org.apache.inlong.tubemq.manager.entry.TopicStatus;
import org.apache.inlong.tubemq.manager.entry.TopicTaskEntry;
import org.apache.inlong.tubemq.manager.enums.ErrorCode;
import org.apache.inlong.tubemq.manager.enums.TaskStatusEnum;
import org.apache.inlong.tubemq.manager.executors.AddTopicExecutor;
import org.apache.inlong.tubemq.manager.repository.MasterRepository;
import org.apache.inlong.tubemq.manager.repository.TopicTaskRepository;
import org.apache.inlong.tubemq.manager.service.interfaces.ClusterService;
import org.apache.inlong.tubemq.manager.service.interfaces.MasterService;
import org.apache.inlong.tubemq.manager.service.interfaces.NodeService;
import org.apache.inlong.tubemq.manager.service.interfaces.TaskService;
import org.apache.inlong.tubemq.manager.service.interfaces.TopicService;
import org.apache.inlong.tubemq.manager.service.tube.TopicView;
import org.apache.inlong.tubemq.manager.service.tube.TubeHttpBrokerInfoList;
import org.apache.inlong.tubemq.manager.utils.ValidateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/inlong/tubemq/manager/service/TaskServiceImpl.class */
public class TaskServiceImpl implements TaskService {
    private static final Logger log = LoggerFactory.getLogger(TaskServiceImpl.class);
    public static final Integer MAX_RELOAD_TIMES = 500;

    @Autowired
    TopicTaskRepository topicTaskRepository;

    @Autowired
    ClusterService clusterService;

    @Autowired
    AddTopicExecutor addTopicExecutor;

    @Autowired
    NodeService nodeService;

    @Autowired
    MasterRepository masterRepository;

    @Autowired
    TopicService topicService;

    @Autowired
    MasterService masterService;

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TaskService
    @Transactional
    public TubeMQResult addTopicCreateTask(Long l, Set<String> set) {
        try {
            ArrayList arrayList = new ArrayList(64);
            Set<String> findExistTopics = findExistTopics(l, set);
            if (!findExistTopics.isEmpty()) {
                return TubeMQResult.errorResult("There are topic tasks " + findExistTopics + " already in adding status", ErrorCode.TASK_EXIST.getCode());
            }
            set.forEach(str -> {
                TopicTaskEntry topicTaskEntry = new TopicTaskEntry();
                topicTaskEntry.setClusterId(l);
                topicTaskEntry.setTopicName(str);
                arrayList.add(topicTaskEntry);
            });
            this.topicTaskRepository.saveAll(arrayList);
            return TubeMQResult.successResult();
        } catch (Exception e) {
            log.error("save topic tasks to db fail, topics : {}", set, e);
            return TubeMQResult.errorResult(TubeMQErrorConst.MYSQL_ERROR);
        }
    }

    private Set<String> findExistTopics(Long l, Set<String> set) {
        return (Set) set.stream().filter(str -> {
            return hasAlreadyExistTopicTask(l, str, TaskStatusEnum.ADDING.getCode());
        }).collect(Collectors.toSet());
    }

    public boolean hasAlreadyExistTopicTask(Long l, String str, Integer num) {
        return this.topicTaskRepository.findTopicTaskEntryByClusterIdAndStatusAndTopicName(l, num, str) != null;
    }

    @Scheduled(cron = "${topic.config.schedule}")
    public void executeTopicConfigTasks() {
        Iterator<ClusterEntry> it = this.clusterService.getAllClusters().iterator();
        while (it.hasNext()) {
            long clusterId = it.next().getClusterId();
            this.addTopicExecutor.addTopicConfig(Long.valueOf(clusterId), this.topicTaskRepository.findTopicTaskEntriesByClusterIdAndStatus(Long.valueOf(clusterId), TaskStatusEnum.ADDING.getCode()));
        }
    }

    @Scheduled(cron = "${broker.reload.schedule}")
    public void reloadBrokers() {
        for (ClusterEntry clusterEntry : this.clusterService.getAllClusters()) {
            long clusterId = clusterEntry.getClusterId();
            MasterEntry masterNode = this.masterService.getMasterNode(Long.valueOf(clusterId));
            if (!ValidateUtils.isNull(masterNode)) {
                TubeHttpBrokerInfoList requestBrokerStatus = this.nodeService.requestBrokerStatus(masterNode);
                if (!ValidateUtils.isNull(requestBrokerStatus)) {
                    doReloadBrokers(clusterId, masterNode, requestBrokerStatus, clusterEntry);
                }
            }
        }
    }

    @Async("asyncExecutor")
    public void doReloadBrokers(long j, MasterEntry masterEntry, TubeHttpBrokerInfoList tubeHttpBrokerInfoList, ClusterEntry clusterEntry) {
        this.nodeService.handleReloadBroker(masterEntry, tubeHttpBrokerInfoList.getNeedReloadList(), clusterEntry);
        updateCreateTopicTaskStatus(j);
    }

    @Transactional(rollbackOn = {Exception.class})
    public void updateCreateTopicTaskStatus(long j) {
        List<TopicTaskEntry> findTopicTaskEntriesByClusterIdAndStatus = this.topicTaskRepository.findTopicTaskEntriesByClusterIdAndStatus(Long.valueOf(j), TaskStatusEnum.ADDING.getCode());
        TopicView requestTopicViewInfo = this.topicService.requestTopicViewInfo(Long.valueOf(j), null);
        if (requestTopicViewInfo == null || requestTopicViewInfo.getData() == null) {
            return;
        }
        Map<String, TopicView.TopicViewInfo> map = (Map) requestTopicViewInfo.getData().stream().collect(Collectors.toMap((v0) -> {
            return v0.getTopicName();
        }, topicViewInfo -> {
            return topicViewInfo;
        }));
        TubeHttpBrokerInfoList requestBrokerStatus = this.nodeService.requestBrokerStatus(this.masterService.getMasterNode(Long.valueOf(j)));
        if (ValidateUtils.isNull(requestBrokerStatus)) {
            return;
        }
        updateTaskRepo(findTopicTaskEntriesByClusterIdAndStatus, map, requestBrokerStatus);
    }

    private void updateTaskRepo(List<TopicTaskEntry> list, Map<String, TopicView.TopicViewInfo> map, TubeHttpBrokerInfoList tubeHttpBrokerInfoList) {
        int size = tubeHttpBrokerInfoList.getAllBrokerIdList().size();
        for (TopicTaskEntry topicTaskEntry : list) {
            TopicView.TopicViewInfo topicViewInfo = map.get(topicTaskEntry.getTopicName());
            if (topicViewInfo != null) {
                if (size == topicViewInfo.getTotalCfgBrokerCnt() && topicViewInfo.getTotalCfgNumPart() == topicViewInfo.getTotalRunNumPartCount()) {
                    topicTaskEntry.setStatus(TaskStatusEnum.SUCCESS.getCode());
                } else {
                    Integer reloadRetryTimes = topicTaskEntry.getReloadRetryTimes();
                    if (reloadRetryTimes.intValue() >= MAX_RELOAD_TIMES.intValue()) {
                        topicTaskEntry.setStatus(Integer.valueOf(TopicStatus.FAILED.value()));
                    }
                    topicTaskEntry.setReloadRetryTimes(Integer.valueOf(reloadRetryTimes.intValue() + 1));
                }
            }
        }
        this.topicTaskRepository.saveAll(list);
    }
}
