package org.apache.inlong.tubemq.manager.service;

import com.google.gson.Gson;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.inlong.tubemq.manager.controller.TubeMQResult;
import org.apache.inlong.tubemq.manager.controller.group.request.DeleteOffsetReq;
import org.apache.inlong.tubemq.manager.controller.group.request.QueryConsumerGroupReq;
import org.apache.inlong.tubemq.manager.controller.group.request.QueryOffsetReq;
import org.apache.inlong.tubemq.manager.controller.group.result.AllBrokersOffsetRes;
import org.apache.inlong.tubemq.manager.controller.group.result.GroupOffsetRes;
import org.apache.inlong.tubemq.manager.controller.group.result.OffsetQueryRes;
import org.apache.inlong.tubemq.manager.controller.group.result.TopicOffsetRes;
import org.apache.inlong.tubemq.manager.controller.node.request.CloneOffsetReq;
import org.apache.inlong.tubemq.manager.controller.topic.request.RebalanceGroupReq;
import org.apache.inlong.tubemq.manager.entry.MasterEntry;
import org.apache.inlong.tubemq.manager.enums.ErrorCode;
import org.apache.inlong.tubemq.manager.service.interfaces.BrokerService;
import org.apache.inlong.tubemq.manager.service.interfaces.MasterService;
import org.apache.inlong.tubemq.manager.service.interfaces.TopicService;
import org.apache.inlong.tubemq.manager.service.tube.CleanOffsetResult;
import org.apache.inlong.tubemq.manager.service.tube.RebalanceGroupResult;
import org.apache.inlong.tubemq.manager.service.tube.TopicView;
import org.apache.inlong.tubemq.manager.service.tube.TubeHttpGroupDetailInfo;
import org.apache.inlong.tubemq.manager.service.tube.TubeHttpTopicInfoList;
import org.apache.inlong.tubemq.manager.utils.ConvertUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/inlong/tubemq/manager/service/TopicServiceImpl.class */
public class TopicServiceImpl implements TopicService {
    public static final int FIRST_TOPIC_INDEX = 0;
    public static final int MINIMUN_TOPIC_RUN_PART = 1;
    private final CloseableHttpClient httpclient = HttpClients.createDefault();
    private final Gson gson = new Gson();
    private static final int MAX_TOPIC_NAME_LENGTH = 255;

    @Value("${manager.broker.webPort:8081}")
    private int brokerWebPort;

    @Autowired
    private MasterService masterService;

    @Autowired
    private BrokerService brokerService;
    private static final Logger log = LoggerFactory.getLogger(TopicServiceImpl.class);
    private static final Pattern SPECIAL_CHAR_PATTERN = Pattern.compile("[%\\x00-\\x1F\\x7F-\\uFFFF]");
    private static final String[] DANGEROUS_KEYWORDS = {"exec", "system", "cmd", "shell", "php", "perl", "python", "ruby", "javascript", "java"};

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TubeHttpGroupDetailInfo requestGroupRunInfo(MasterEntry masterEntry, String str) {
        try {
            CloseableHttpResponse execute = this.httpclient.execute(new HttpGet(TubeConst.SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort() + TubeConst.QUERY_GROUP_DETAIL_INFO + TubeConst.CONSUME_GROUP + str));
            Throwable th = null;
            try {
                try {
                    TubeHttpGroupDetailInfo tubeHttpGroupDetailInfo = (TubeHttpGroupDetailInfo) this.gson.fromJson(new InputStreamReader(execute.getEntity().getContent(), StandardCharsets.UTF_8), TubeHttpGroupDetailInfo.class);
                    if (tubeHttpGroupDetailInfo.getErrCode() == 0) {
                        if (execute != null) {
                            if (0 != 0) {
                                try {
                                    execute.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                execute.close();
                            }
                        }
                        return tubeHttpGroupDetailInfo;
                    }
                    if (execute != null) {
                        if (0 != 0) {
                            try {
                                execute.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            execute.close();
                        }
                    }
                    return null;
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("exception caught while requesting group status", e);
            return null;
        }
        log.error("exception caught while requesting group status", e);
        return null;
    }

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TubeMQResult queryGroupExist(QueryConsumerGroupReq queryConsumerGroupReq) {
        return requestGroupRunInfo(this.masterService.getMasterNode(queryConsumerGroupReq), queryConsumerGroupReq.getConsumerGroup()).getTopicSet().stream().anyMatch(str -> {
            return str.equals(queryConsumerGroupReq.getTopicName());
        }) ? TubeMQResult.successResult() : TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_GROUP);
    }

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TopicView requestTopicViewInfo(Long l, String str) {
        MasterEntry masterNode = this.masterService.getMasterNode(l);
        validateMasterEntry(masterNode, l);
        validateTopicName(str, l);
        try {
            CloseableHttpResponse execute = this.httpclient.execute(new HttpGet(buildTopicViewURL(masterNode, str, l)));
            Throwable th = null;
            try {
                try {
                    TopicView parseTopicViewResponse = parseTopicViewResponse(execute);
                    if (execute != null) {
                        if (0 != 0) {
                            try {
                                execute.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            execute.close();
                        }
                    }
                    return parseTopicViewResponse;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            handleRequestException(l, str, e);
            throw new RuntimeException(e.getMessage());
        }
    }

    private void validateMasterEntry(MasterEntry masterEntry, Long l) {
        if (masterEntry == null || StringUtils.isBlank(masterEntry.getIp()) || masterEntry.getWebPort() <= 0) {
            log.error("Invalid MasterEntry: ClusterId = {}", l);
            throw new IllegalArgumentException("Invalid MasterEntry.");
        }
    }

    private void validateTopicName(String str, Long l) {
        if (StringUtils.isBlank(str) || containsDangerousChars(str) || str.length() > MAX_TOPIC_NAME_LENGTH) {
            log.error("Invalid topicName: ClusterId = {}, TopicName = {}", l, str);
            throw new IllegalArgumentException("Invalid topicName.");
        }
    }

    private String buildTopicViewURL(MasterEntry masterEntry, String str, Long l) {
        String str2 = TubeConst.SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort() + TubeConst.TOPIC_VIEW;
        if (isValidURL(str2)) {
            return str2 + TubeConst.TOPIC_NAME + str;
        }
        log.error("Invalid URL: ClusterId = {}, URL = {}", l, str2);
        throw new IllegalArgumentException("Invalid URL.");
    }

    private TopicView parseTopicViewResponse(CloseableHttpResponse closeableHttpResponse) throws Exception {
        return (TopicView) this.gson.fromJson(new InputStreamReader(closeableHttpResponse.getEntity().getContent(), StandardCharsets.UTF_8), TopicView.class);
    }

    private void handleRequestException(Long l, String str, Exception exc) {
        log.error("Exception caught while requesting topic view: ClusterId = {}, TopicName = {}", new Object[]{l, str, exc});
    }

    private boolean containsDangerousChars(String str) {
        String lowerCase = str.toLowerCase();
        if (lowerCase.contains("://") || StringUtils.containsAny(lowerCase, DANGEROUS_KEYWORDS)) {
            return true;
        }
        return SPECIAL_CHAR_PATTERN.matcher(lowerCase).find();
    }

    private boolean isValidURL(String str) {
        try {
            new URL(str);
            return true;
        } catch (MalformedURLException e) {
            log.warn("URL validation failed with exception: {}", e.getMessage());
            return false;
        }
    }

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq cloneOffsetReq) {
        MasterEntry masterNode = this.masterService.getMasterNode(Long.valueOf(cloneOffsetReq.getClusterId().intValue()));
        if (masterNode == null) {
            return TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_CLUSTER);
        }
        TubeHttpTopicInfoList requestTopicConfigInfo = requestTopicConfigInfo(masterNode, cloneOffsetReq.getTopicName());
        TubeMQResult tubeMQResult = new TubeMQResult();
        if (requestTopicConfigInfo == null) {
            return tubeMQResult;
        }
        Iterator<TubeHttpTopicInfoList.TopicInfoList.TopicInfo> it = requestTopicConfigInfo.getTopicInfo().iterator();
        while (it.hasNext()) {
            tubeMQResult = this.brokerService.cloneOffset(it.next().getBrokerIp(), this.brokerWebPort, cloneOffsetReq);
            if (tubeMQResult.getErrCode() != TubeConst.SUCCESS_CODE.intValue()) {
                return tubeMQResult;
            }
        }
        return tubeMQResult;
    }

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TubeHttpTopicInfoList requestTopicConfigInfo(MasterEntry masterEntry, String str) {
        try {
            CloseableHttpResponse execute = this.httpclient.execute(new HttpGet(TubeConst.SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort() + TubeConst.TOPIC_CONFIG_INFO + TubeConst.TOPIC_NAME + str));
            Throwable th = null;
            try {
                try {
                    TubeHttpTopicInfoList tubeHttpTopicInfoList = (TubeHttpTopicInfoList) this.gson.fromJson(new InputStreamReader(execute.getEntity().getContent(), StandardCharsets.UTF_8), TubeHttpTopicInfoList.class);
                    if (tubeHttpTopicInfoList.getErrCode() == TubeConst.SUCCESS_CODE.intValue()) {
                        if (execute != null) {
                            if (0 != 0) {
                                try {
                                    execute.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                execute.close();
                            }
                        }
                        return tubeHttpTopicInfoList;
                    }
                    log.error("exception caught while requesting topic config info {}", tubeHttpTopicInfoList.getErrMsg());
                    if (execute != null) {
                        if (0 != 0) {
                            try {
                                execute.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            execute.close();
                        }
                    }
                    return null;
                } finally {
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Exception e) {
            log.error("exception caught while requesting broker status", e);
            return null;
        }
        log.error("exception caught while requesting broker status", e);
        return null;
    }

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TubeMQResult rebalanceGroup(RebalanceGroupReq rebalanceGroupReq) {
        MasterEntry masterNode = this.masterService.getMasterNode(Long.valueOf(rebalanceGroupReq.getClusterId().intValue()));
        if (masterNode == null) {
            return TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_CLUSTER);
        }
        List<String> consumerIds = ((TubeHttpGroupDetailInfo) Objects.requireNonNull(requestGroupRunInfo(masterNode, rebalanceGroupReq.getGroupName()))).getConsumerIds();
        RebalanceGroupResult rebalanceGroupResult = new RebalanceGroupResult();
        consumerIds.forEach(str -> {
            if (this.masterService.requestMaster(TubeConst.SCHEMA + masterNode.getIp() + ":" + masterNode.getWebPort() + "/" + TubeConst.TUBE_REQUEST_PATH + "?" + ConvertUtils.convertReqToQueryStr(ConvertUtils.convertToRebalanceConsumerReq(rebalanceGroupReq, str))).getErrCode() != 0) {
                rebalanceGroupResult.getFailConsumers().add(str);
            }
            rebalanceGroupResult.getSuccessConsumers().add(str);
        });
        TubeMQResult tubeMQResult = new TubeMQResult();
        tubeMQResult.setData(this.gson.toJson(rebalanceGroupResult));
        return tubeMQResult;
    }

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TubeMQResult deleteOffset(DeleteOffsetReq deleteOffsetReq) {
        MasterEntry masterNode = this.masterService.getMasterNode(Long.valueOf(deleteOffsetReq.getClusterId().intValue()));
        if (masterNode == null) {
            return TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_CLUSTER);
        }
        TubeHttpTopicInfoList requestTopicConfigInfo = requestTopicConfigInfo(masterNode, deleteOffsetReq.getTopicName());
        TubeMQResult tubeMQResult = new TubeMQResult();
        CleanOffsetResult cleanOffsetResult = new CleanOffsetResult();
        if (requestTopicConfigInfo == null) {
            return TubeMQResult.errorResult("no such topic");
        }
        Iterator<TubeHttpTopicInfoList.TopicInfoList.TopicInfo> it = requestTopicConfigInfo.getTopicInfo().iterator();
        while (it.hasNext()) {
            String brokerIp = it.next().getBrokerIp();
            tubeMQResult = this.brokerService.deleteOffset(brokerIp, this.brokerWebPort, deleteOffsetReq);
            if (tubeMQResult.getErrCode() != TubeConst.SUCCESS_CODE.intValue()) {
                cleanOffsetResult.getFailBrokers().add(brokerIp);
            } else {
                cleanOffsetResult.getSuccessBrokers().add(brokerIp);
            }
        }
        tubeMQResult.setData(this.gson.toJson(cleanOffsetResult));
        return tubeMQResult;
    }

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TubeMQResult queryOffset(QueryOffsetReq queryOffsetReq) {
        MasterEntry masterNode = this.masterService.getMasterNode(Long.valueOf(queryOffsetReq.getClusterId().intValue()));
        if (masterNode == null) {
            return TubeMQResult.errorResult(TubeMQErrorConst.NO_SUCH_CLUSTER);
        }
        TubeHttpTopicInfoList requestTopicConfigInfo = requestTopicConfigInfo(masterNode, queryOffsetReq.getTopicName());
        TubeMQResult tubeMQResult = new TubeMQResult();
        if (requestTopicConfigInfo == null) {
            return TubeMQResult.errorResult("no such topic");
        }
        List<TubeHttpTopicInfoList.TopicInfoList.TopicInfo> topicInfo = requestTopicConfigInfo.getTopicInfo();
        AllBrokersOffsetRes allBrokersOffsetRes = new AllBrokersOffsetRes();
        List<AllBrokersOffsetRes.OffsetInfo> offsetPerBroker = allBrokersOffsetRes.getOffsetPerBroker();
        for (TubeHttpTopicInfoList.TopicInfoList.TopicInfo topicInfo2 : topicInfo) {
            OffsetQueryRes queryOffset = this.brokerService.queryOffset(topicInfo2.getBrokerIp(), this.brokerWebPort, queryOffsetReq);
            if (queryOffset.getErrCode() != TubeConst.SUCCESS_CODE.intValue()) {
                return TubeMQResult.errorResult("query broker id" + topicInfo2.getBrokerId() + " fail");
            }
            generateOffsetInfo(offsetPerBroker, topicInfo2, queryOffset);
        }
        tubeMQResult.setData(allBrokersOffsetRes);
        return tubeMQResult;
    }

    @Override // org.apache.inlong.tubemq.manager.service.interfaces.TopicService
    public TubeMQResult queryCanWrite(String str, Long l) {
        List<TopicView.TopicViewInfo> data = requestTopicViewInfo(l, str).getData();
        return CollectionUtils.isEmpty(data) ? TubeMQResult.errorResult(ErrorCode.NO_SUCH_TOPIC) : data.get(0).getTotalRunNumPartCount() >= 1 ? TubeMQResult.successResult() : TubeMQResult.errorResult(ErrorCode.TOPIC_NOT_WRITABLE);
    }

    private void generateOffsetInfo(List<AllBrokersOffsetRes.OffsetInfo> list, TubeHttpTopicInfoList.TopicInfoList.TopicInfo topicInfo, OffsetQueryRes offsetQueryRes) {
        AllBrokersOffsetRes.OffsetInfo offsetInfo = new AllBrokersOffsetRes.OffsetInfo();
        offsetInfo.setBrokerId(topicInfo.getBrokerId());
        offsetInfo.setBrokerIp(topicInfo.getBrokerIp());
        if (TubeConst.SUCCESS_CODE.intValue() == offsetQueryRes.getErrCode()) {
            Iterator<GroupOffsetRes> it = offsetQueryRes.getDataSet().iterator();
            while (it.hasNext()) {
                Iterator<TopicOffsetRes> it2 = it.next().getSubInfo().iterator();
                while (it2.hasNext()) {
                    offsetInfo.setOffsets(it2.next().getOffsets());
                }
            }
            list.add(offsetInfo);
        }
    }
}
