package org.apache.rocketmq.tools.admin;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.api.TrackType;

/* loaded from: input_file:org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.class */
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
    private final InternalLogger log;
    private final DefaultMQAdminExt defaultMQAdminExt;
    private ServiceState serviceState;
    private MQClientInstance mqClientInstance;
    private RPCHook rpcHook;
    private long timeoutMillis;
    private Random random;
    private static final Set<String> SYSTEM_GROUP_SET = new HashSet();

    public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long j) {
        this(defaultMQAdminExt, null, j);
    }

    public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, RPCHook rPCHook, long j) {
        this.log = ClientLogger.getLog();
        this.serviceState = ServiceState.CREATE_JUST;
        this.timeoutMillis = 20000L;
        this.random = new Random();
        this.defaultMQAdminExt = defaultMQAdminExt;
        this.rpcHook = rPCHook;
        this.timeoutMillis = j;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.defaultMQAdminExt.changeInstanceNameToPID();
                this.mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQAdminExt, this.rpcHook);
                if (!this.mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this)) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup() + "] has created already, specifed another name please." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable) null);
                }
                this.mqClientInstance.start();
                this.log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup());
                this.serviceState = ServiceState.RUNNING;
                return;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The AdminExt service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable) null);
            default:
                return;
        }
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void shutdown() {
        switch (this.serviceState) {
            case CREATE_JUST:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
            default:
                return;
            case RUNNING:
                this.mqClientInstance.unregisterAdminExt(this.defaultMQAdminExt.getAdminExtGroup());
                this.mqClientInstance.shutdown();
                this.log.info("the adminExt [{}] shutdown OK", this.defaultMQAdminExt.getAdminExtGroup());
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                return;
        }
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void updateBrokerConfig(String str, Properties properties) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
        this.mqClientInstance.getMQClientAPIImpl().updateBrokerConfig(str, properties, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public Properties getBrokerConfig(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().getBrokerConfig(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void createAndUpdateTopicConfig(String str, TopicConfig topicConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().createTopic(str, this.defaultMQAdminExt.getCreateTopicKey(), topicConfig, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void createAndUpdatePlainAccessConfig(String str, PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().createPlainAccessConfig(str, plainAccessConfig, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void deletePlainAccessConfig(String str, String str2) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().deleteAccessConfig(str, str2, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void updateGlobalWhiteAddrConfig(String str, String str2) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(str, str2, null, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void updateGlobalWhiteAddrConfig(String str, String str2, String str3) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(str, str2, str3, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(String str) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public AclConfig examineBrokerClusterAclConfig(String str) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterConfig(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void createAndUpdateSubscriptionGroupConfig(String str, SubscriptionGroupConfig subscriptionGroupConfig) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(str, subscriptionGroupConfig, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String str, String str2) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(str, this.timeoutMillis).getSubscriptionGroupTable().get(str2);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public TopicConfig examineTopicConfig(String str, String str2) throws RemotingException, InterruptedException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(str, this.timeoutMillis).getTopicConfigTable().get(str2);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public TopicStatsTable examineTopicStats(String str) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        TopicRouteData examineTopicRouteInfo = examineTopicRouteInfo(str);
        TopicStatsTable topicStatsTable = new TopicStatsTable();
        Iterator<BrokerData> it = examineTopicRouteInfo.getBrokerDatas().iterator();
        while (it.hasNext()) {
            String selectBrokerAddr = it.next().selectBrokerAddr();
            if (selectBrokerAddr != null) {
                topicStatsTable.getOffsetTable().putAll(this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(selectBrokerAddr, str, this.timeoutMillis).getOffsetTable());
            }
        }
        if (topicStatsTable.getOffsetTable().isEmpty()) {
            throw new MQClientException("Not found the topic stats info", (Throwable) null);
        }
        return topicStatsTable;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
        return this.mqClientInstance.getMQClientAPIImpl().getTopicListFromNameServer(this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public TopicList fetchTopicsByCLuster(String str) throws RemotingException, MQClientException, InterruptedException {
        return this.mqClientInstance.getMQClientAPIImpl().getTopicsByCluster(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public KVTable fetchBrokerRuntimeStats(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ConsumeStats examineConsumeStats(String str) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        return examineConsumeStats(str, null);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ConsumeStats examineConsumeStats(String str, String str2) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        TopicRouteData examineTopicRouteInfo = examineTopicRouteInfo(MixAll.getRetryTopic(str));
        ConsumeStats consumeStats = new ConsumeStats();
        Iterator<BrokerData> it = examineTopicRouteInfo.getBrokerDatas().iterator();
        while (it.hasNext()) {
            String selectBrokerAddr = it.next().selectBrokerAddr();
            if (selectBrokerAddr != null) {
                ConsumeStats consumeStats2 = this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(selectBrokerAddr, str, str2, this.timeoutMillis * 3);
                consumeStats.getOffsetTable().putAll(consumeStats2.getOffsetTable());
                consumeStats.setConsumeTps(consumeStats.getConsumeTps() + consumeStats2.getConsumeTps());
            }
        }
        if (consumeStats.getOffsetTable().isEmpty()) {
            throw new MQClientException(206, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
        }
        return consumeStats;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public TopicRouteData examineTopicRouteInfo(String str) throws RemotingException, MQClientException, InterruptedException {
        return this.mqClientInstance.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public MessageExt viewMessage(String str, String str2) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            MessageDecoder.decodeMessageId(str2);
            return viewMessage(str2);
        } catch (Exception e) {
            this.log.warn("the msgId maybe created by new client. msgId={}", str2, e);
            return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(str, str2);
        }
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ConsumerConnection examineConsumerConnectionInfo(String str) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
        ConsumerConnection consumerConnection = new ConsumerConnection();
        List<BrokerData> brokerDatas = examineTopicRouteInfo(MixAll.getRetryTopic(str)).getBrokerDatas();
        BrokerData brokerData = brokerDatas.get(this.random.nextInt(brokerDatas.size()));
        String str2 = null;
        if (brokerData != null) {
            str2 = brokerData.selectBrokerAddr();
            if (StringUtils.isNotBlank(str2)) {
                consumerConnection = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(str2, str, this.timeoutMillis);
            }
        }
        if (!consumerConnection.getConnectionSet().isEmpty()) {
            return consumerConnection;
        }
        this.log.warn("the consumer group not online. brokerAddr={}, group={}", str2, str);
        throw new MQClientException(206, "Not found the consumer group connection");
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ConsumerConnection examineConsumerConnectionInfo(String str, String str2) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
        ConsumerConnection consumerConnectionList = this.mqClientInstance.getMQClientAPIImpl().getConsumerConnectionList(str2, str, this.timeoutMillis);
        if (!consumerConnectionList.getConnectionSet().isEmpty()) {
            return consumerConnectionList;
        }
        this.log.warn("the consumer group not online. brokerAddr={}, group={}", str2, str);
        throw new MQClientException(206, "Not found the consumer group connection");
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ProducerConnection examineProducerConnectionInfo(String str, String str2) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        ProducerConnection producerConnection = new ProducerConnection();
        List<BrokerData> brokerDatas = examineTopicRouteInfo(str2).getBrokerDatas();
        BrokerData brokerData = brokerDatas.get(this.random.nextInt(brokerDatas.size()));
        String str3 = null;
        if (brokerData != null) {
            str3 = brokerData.selectBrokerAddr();
            if (StringUtils.isNotBlank(str3)) {
                producerConnection = this.mqClientInstance.getMQClientAPIImpl().getProducerConnectionList(str3, str, this.timeoutMillis);
            }
        }
        if (!producerConnection.getConnectionSet().isEmpty()) {
            return producerConnection;
        }
        this.log.warn("the producer group not online. brokerAddr={}, group={}", str3, str);
        throw new MQClientException("Not found the producer group connection", (Throwable) null);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ProducerTableInfo getAllProducerInfo(String str) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().getAllProducerInfo(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public List<String> getNameServerAddressList() {
        return this.mqClientInstance.getMQClientAPIImpl().getNameServerAddressList();
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public int wipeWritePermOfBroker(String str, String str2) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
        return this.mqClientInstance.getMQClientAPIImpl().wipeWritePermOfBroker(str, str2, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public int addWritePermOfBroker(String str, String str2) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
        return this.mqClientInstance.getMQClientAPIImpl().addWritePermOfBroker(str, str2, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void putKVConfig(String str, String str2, String str3) {
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public String getKVConfig(String str, String str2) throws RemotingException, MQClientException, InterruptedException {
        return this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(str, str2, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public KVTable getKVListByNamespace(String str) throws RemotingException, MQClientException, InterruptedException {
        return this.mqClientInstance.getMQClientAPIImpl().getKVListByNamespace(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void deleteTopicInBroker(Set<String> set, String str) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(it.next(), str, this.timeoutMillis);
        }
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void deleteTopicInNameServer(Set<String> set, String str, String str2) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        if (set == null) {
            set = new HashSet(Arrays.asList(this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr().split(";")));
        }
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(it.next(), str, str2, this.timeoutMillis);
        }
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void deleteSubscriptionGroup(String str, String str2) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(str, str2, false, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void deleteSubscriptionGroup(String str, String str2, boolean z) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(str, str2, z, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void createAndUpdateKvConfig(String str, String str2, String str3) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(str, str2, str3, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void deleteKvConfig(String str, String str2) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(str, str2, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public List<RollbackStats> resetOffsetByTimestampOld(String str, String str2, long j, boolean z) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        TopicRouteData examineTopicRouteInfo = examineTopicRouteInfo(str2);
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        for (BrokerData brokerData : examineTopicRouteInfo.getBrokerDatas()) {
            for (QueueData queueData : examineTopicRouteInfo.getQueueDatas()) {
                if (StringUtils.equals(queueData.getBrokerName(), brokerData.getBrokerName())) {
                    hashMap.put(brokerData.selectBrokerAddr(), Integer.valueOf(queueData.getReadQueueNums()));
                }
            }
        }
        for (BrokerData brokerData2 : examineTopicRouteInfo.getBrokerDatas()) {
            String selectBrokerAddr = brokerData2.selectBrokerAddr();
            if (selectBrokerAddr != null) {
                boolean z2 = false;
                for (Map.Entry<MessageQueue, OffsetWrapper> entry : this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(selectBrokerAddr, str, this.timeoutMillis).getOffsetTable().entrySet()) {
                    MessageQueue key = entry.getKey();
                    OffsetWrapper value = entry.getValue();
                    if (str2.equals(key.getTopic())) {
                        z2 = true;
                        arrayList.add(resetOffsetConsumeOffset(selectBrokerAddr, str, key, value, j, z));
                    }
                }
                if (!z2) {
                    HashMap<MessageQueue, TopicOffset> offsetTable = this.mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(selectBrokerAddr, str2, this.timeoutMillis).getOffsetTable();
                    for (int i = 0; i < ((Integer) hashMap.get(selectBrokerAddr)).intValue(); i++) {
                        MessageQueue messageQueue = new MessageQueue(str2, brokerData2.getBrokerName(), i);
                        OffsetWrapper offsetWrapper = new OffsetWrapper();
                        offsetWrapper.setBrokerOffset(offsetTable.get(messageQueue).getMaxOffset());
                        offsetWrapper.setConsumerOffset(offsetTable.get(messageQueue).getMinOffset());
                        arrayList.add(resetOffsetConsumeOffset(selectBrokerAddr, str, messageQueue, offsetWrapper, j, z));
                    }
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public Map<MessageQueue, Long> resetOffsetByTimestamp(String str, String str2, long j, boolean z) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        return resetOffsetByTimestamp(str, str2, j, z, false);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void resetOffsetNew(String str, String str2, long j) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        try {
            resetOffsetByTimestamp(str2, str, j, true);
        } catch (MQClientException e) {
            if (206 != e.getResponseCode()) {
                throw e;
            }
            resetOffsetByTimestampOld(str, str2, j, true);
        }
    }

    public Map<MessageQueue, Long> resetOffsetByTimestamp(String str, String str2, long j, boolean z, boolean z2) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        Map<MessageQueue, Long> invokeBrokerToResetOffset;
        List<BrokerData> brokerDatas = examineTopicRouteInfo(str).getBrokerDatas();
        HashMap hashMap = new HashMap();
        if (brokerDatas != null) {
            Iterator<BrokerData> it = brokerDatas.iterator();
            while (it.hasNext()) {
                String selectBrokerAddr = it.next().selectBrokerAddr();
                if (selectBrokerAddr != null && (invokeBrokerToResetOffset = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(selectBrokerAddr, str, str2, j, z, this.timeoutMillis, z2)) != null) {
                    hashMap.putAll(invokeBrokerToResetOffset);
                }
            }
        }
        return hashMap;
    }

    private RollbackStats resetOffsetConsumeOffset(String str, String str2, MessageQueue messageQueue, OffsetWrapper offsetWrapper, long j, boolean z) throws RemotingException, InterruptedException, MQBrokerException {
        long maxOffset = j == -1 ? this.mqClientInstance.getMQClientAPIImpl().getMaxOffset(str, messageQueue.getTopic(), messageQueue.getQueueId(), this.timeoutMillis) : this.mqClientInstance.getMQClientAPIImpl().searchOffset(str, messageQueue.getTopic(), messageQueue.getQueueId(), j, this.timeoutMillis);
        RollbackStats rollbackStats = new RollbackStats();
        rollbackStats.setBrokerName(messageQueue.getBrokerName());
        rollbackStats.setQueueId(messageQueue.getQueueId());
        rollbackStats.setBrokerOffset(offsetWrapper.getBrokerOffset());
        rollbackStats.setConsumerOffset(offsetWrapper.getConsumerOffset());
        rollbackStats.setTimestampOffset(maxOffset);
        rollbackStats.setRollbackOffset(offsetWrapper.getConsumerOffset());
        if (z || maxOffset <= offsetWrapper.getConsumerOffset()) {
            rollbackStats.setRollbackOffset(maxOffset);
            UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = new UpdateConsumerOffsetRequestHeader();
            updateConsumerOffsetRequestHeader.setConsumerGroup(str2);
            updateConsumerOffsetRequestHeader.setTopic(messageQueue.getTopic());
            updateConsumerOffsetRequestHeader.setQueueId(Integer.valueOf(messageQueue.getQueueId()));
            updateConsumerOffsetRequestHeader.setCommitOffset(Long.valueOf(maxOffset));
            this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(str, updateConsumerOffsetRequestHeader, this.timeoutMillis);
        }
        return rollbackStats;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String str, String str2, String str3) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        String selectBrokerAddr;
        List<BrokerData> brokerDatas = examineTopicRouteInfo(str).getBrokerDatas();
        return (brokerDatas == null || brokerDatas.size() <= 0 || (selectBrokerAddr = brokerDatas.get(0).selectBrokerAddr()) == null) ? Collections.EMPTY_MAP : this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToGetConsumerStatus(selectBrokerAddr, str, str2, str3, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void createOrUpdateOrderConf(String str, String str2, boolean z) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        if (z) {
            this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, str, str2, this.timeoutMillis);
            return;
        }
        String str3 = null;
        try {
            str3 = this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, str, this.timeoutMillis);
        } catch (Exception e) {
            e.printStackTrace();
        }
        HashMap hashMap = new HashMap();
        if (!UtilAll.isBlank(str3)) {
            for (String str4 : str3.split(";")) {
                hashMap.put(str4.split(":")[0], str4);
            }
        }
        hashMap.put(str2.split(":")[0], str2);
        StringBuilder sb = new StringBuilder();
        String str5 = "";
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            sb.append(str5).append((String) ((Map.Entry) it.next()).getValue());
            str5 = ";";
        }
        this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, str, sb.toString(), this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public GroupList queryTopicConsumeByWho(String str) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
        String selectBrokerAddr;
        Iterator<BrokerData> it = examineTopicRouteInfo(str).getBrokerDatas().iterator();
        if (!it.hasNext() || (selectBrokerAddr = it.next().selectBrokerAddr()) == null) {
            return null;
        }
        return this.mqClientInstance.getMQClientAPIImpl().queryTopicConsumeByWho(selectBrokerAddr, str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public List<QueueTimeSpan> queryConsumeTimeSpan(String str, String str2) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
        ArrayList arrayList = new ArrayList();
        Iterator<BrokerData> it = examineTopicRouteInfo(str).getBrokerDatas().iterator();
        while (it.hasNext()) {
            String selectBrokerAddr = it.next().selectBrokerAddr();
            if (selectBrokerAddr != null) {
                arrayList.addAll(this.mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(selectBrokerAddr, str, str2, this.timeoutMillis));
            }
        }
        return arrayList;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public boolean cleanExpiredConsumerQueue(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean z = false;
        try {
            ClusterInfo examineBrokerClusterInfo = examineBrokerClusterInfo();
            if (null == str || "".equals(str)) {
                for (String str2 : examineBrokerClusterInfo.retrieveAllClusterNames()) {
                    z = cleanExpiredConsumerQueueByCluster(examineBrokerClusterInfo, str2);
                }
            } else {
                z = cleanExpiredConsumerQueueByCluster(examineBrokerClusterInfo, str);
            }
        } catch (MQBrokerException e) {
            this.log.error("cleanExpiredConsumerQueue error.", (Throwable) e);
        }
        return z;
    }

    public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo, String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean z = false;
        for (String str2 : clusterInfo.retrieveAllAddrByCluster(str)) {
            z = cleanExpiredConsumerQueueByAddr(str2);
        }
        return z;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public boolean cleanExpiredConsumerQueueByAddr(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean cleanExpiredConsumeQueue = this.mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(str, this.timeoutMillis);
        this.log.warn("clean expired ConsumeQueue on target " + str + " broker " + cleanExpiredConsumeQueue);
        return cleanExpiredConsumeQueue;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public boolean deleteExpiredCommitLog(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean z = false;
        try {
            ClusterInfo examineBrokerClusterInfo = examineBrokerClusterInfo();
            if (null == str || "".equals(str)) {
                for (String str2 : examineBrokerClusterInfo.retrieveAllClusterNames()) {
                    z = deleteExpiredCommitLogByCluster(examineBrokerClusterInfo, str2);
                }
            } else {
                z = deleteExpiredCommitLogByCluster(examineBrokerClusterInfo, str);
            }
        } catch (MQBrokerException e) {
            this.log.error("deleteExpiredCommitLog error.", (Throwable) e);
        }
        return z;
    }

    public boolean deleteExpiredCommitLogByCluster(ClusterInfo clusterInfo, String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean z = false;
        for (String str2 : clusterInfo.retrieveAllAddrByCluster(str)) {
            z = deleteExpiredCommitLogByAddr(str2);
        }
        return z;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public boolean deleteExpiredCommitLogByAddr(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean deleteExpiredCommitLog = this.mqClientInstance.getMQClientAPIImpl().deleteExpiredCommitLog(str, this.timeoutMillis);
        this.log.warn("Delete expired CommitLog on target " + str + " broker " + deleteExpiredCommitLog);
        return deleteExpiredCommitLog;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public boolean cleanUnusedTopic(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean z = false;
        try {
            ClusterInfo examineBrokerClusterInfo = examineBrokerClusterInfo();
            if (null == str || "".equals(str)) {
                for (String str2 : examineBrokerClusterInfo.retrieveAllClusterNames()) {
                    z = cleanUnusedTopicByCluster(examineBrokerClusterInfo, str2);
                }
            } else {
                z = cleanUnusedTopicByCluster(examineBrokerClusterInfo, str);
            }
        } catch (MQBrokerException e) {
            this.log.error("cleanExpiredConsumerQueue error.", (Throwable) e);
        }
        return z;
    }

    public boolean cleanUnusedTopicByCluster(ClusterInfo clusterInfo, String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean z = false;
        for (String str2 : clusterInfo.retrieveAllAddrByCluster(str)) {
            z = cleanUnusedTopicByAddr(str2);
        }
        return z;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public boolean cleanUnusedTopicByAddr(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        boolean cleanUnusedTopicByAddr = this.mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(str, this.timeoutMillis);
        this.log.warn("clean expired ConsumeQueue on target " + str + " broker " + cleanUnusedTopicByAddr);
        return cleanUnusedTopicByAddr;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ConsumerRunningInfo getConsumerRunningInfo(String str, String str2, boolean z) throws RemotingException, MQClientException, InterruptedException {
        List<BrokerData> brokerDatas = examineTopicRouteInfo(MixAll.RETRY_GROUP_TOPIC_PREFIX + str).getBrokerDatas();
        if (brokerDatas == null) {
            return null;
        }
        Iterator<BrokerData> it = brokerDatas.iterator();
        while (it.hasNext()) {
            String selectBrokerAddr = it.next().selectBrokerAddr();
            if (selectBrokerAddr != null) {
                return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(selectBrokerAddr, str, str2, z, this.timeoutMillis * 3);
            }
        }
        return null;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ConsumeMessageDirectlyResult consumeMessageDirectly(String str, String str2, String str3) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(viewMessage(str3).getStoreHost()), str, str2, str3, this.timeoutMillis * 3);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ConsumeMessageDirectlyResult consumeMessageDirectly(String str, String str2, String str3, String str4) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        MessageExt viewMessage = viewMessage(str3, str4);
        if (viewMessage.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
            return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(viewMessage.getStoreHost()), str, str2, str4, this.timeoutMillis * 3);
        }
        return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(RemotingUtil.socketAddress2String(viewMessage.getStoreHost()), str, str2, ((MessageClientExt) viewMessage).getOffsetMsgId(), this.timeoutMillis * 3);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public List<MessageTrack> messageTrackDetail(MessageExt messageExt) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = queryTopicConsumeByWho(messageExt.getTopic()).getGroupList().iterator();
        while (it.hasNext()) {
            String next = it.next();
            MessageTrack messageTrack = new MessageTrack();
            messageTrack.setConsumerGroup(next);
            messageTrack.setTrackType(TrackType.UNKNOWN);
            try {
                ConsumerConnection examineConsumerConnectionInfo = examineConsumerConnectionInfo(next);
                switch (examineConsumerConnectionInfo.getConsumeType()) {
                    case CONSUME_ACTIVELY:
                        messageTrack.setTrackType(TrackType.PULL);
                        break;
                    case CONSUME_PASSIVELY:
                        try {
                            if (consumed(messageExt, next)) {
                                messageTrack.setTrackType(TrackType.CONSUMED);
                                for (Map.Entry<String, SubscriptionData> entry : examineConsumerConnectionInfo.getSubscriptionTable().entrySet()) {
                                    if (entry.getKey().equals(messageExt.getTopic()) && !entry.getValue().getTagsSet().contains(messageExt.getTags()) && !entry.getValue().getTagsSet().contains("*") && !entry.getValue().getTagsSet().isEmpty()) {
                                        messageTrack.setTrackType(TrackType.CONSUMED_BUT_FILTERED);
                                    }
                                }
                                break;
                            } else {
                                messageTrack.setTrackType(TrackType.NOT_CONSUME_YET);
                                break;
                            }
                        } catch (MQBrokerException e) {
                            if (206 == e.getResponseCode()) {
                                messageTrack.setTrackType(TrackType.NOT_ONLINE);
                            }
                            messageTrack.setExceptionDesc("CODE:" + e.getResponseCode() + " DESC:" + e.getErrorMessage());
                            arrayList.add(messageTrack);
                            break;
                        } catch (MQClientException e2) {
                            if (206 == e2.getResponseCode()) {
                                messageTrack.setTrackType(TrackType.NOT_ONLINE);
                            }
                            messageTrack.setExceptionDesc("CODE:" + e2.getResponseCode() + " DESC:" + e2.getErrorMessage());
                            arrayList.add(messageTrack);
                            break;
                        } catch (Exception e3) {
                            messageTrack.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e3));
                            arrayList.add(messageTrack);
                            break;
                        }
                }
                arrayList.add(messageTrack);
            } catch (MQBrokerException e4) {
                if (206 == e4.getResponseCode()) {
                    messageTrack.setTrackType(TrackType.NOT_ONLINE);
                }
                messageTrack.setExceptionDesc("CODE:" + e4.getResponseCode() + " DESC:" + e4.getErrorMessage());
                arrayList.add(messageTrack);
            } catch (Exception e5) {
                messageTrack.setExceptionDesc(RemotingHelper.exceptionSimpleDesc(e5));
                arrayList.add(messageTrack);
            }
        }
        return arrayList;
    }

    public boolean consumed(MessageExt messageExt, String str) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        BrokerData brokerData;
        ConsumeStats examineConsumeStats = examineConsumeStats(str);
        ClusterInfo examineBrokerClusterInfo = examineBrokerClusterInfo();
        for (Map.Entry<MessageQueue, OffsetWrapper> entry : examineConsumeStats.getOffsetTable().entrySet()) {
            MessageQueue key = entry.getKey();
            if (key.getTopic().equals(messageExt.getTopic()) && key.getQueueId() == messageExt.getQueueId() && (brokerData = examineBrokerClusterInfo.getBrokerAddrTable().get(key.getBrokerName())) != null) {
                if (RemotingUtil.socketAddress2String(messageExt.getStoreHost()).equals(RemotingUtil.convert2IpString(brokerData.getBrokerAddrs().get(0L))) && entry.getValue().getConsumerOffset() > messageExt.getQueueOffset()) {
                    return true;
                }
            }
        }
        return false;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void cloneGroupOffset(String str, String str2, String str3, boolean z) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        Iterator<BrokerData> it = examineTopicRouteInfo(MixAll.getRetryTopic(str)).getBrokerDatas().iterator();
        while (it.hasNext()) {
            String selectBrokerAddr = it.next().selectBrokerAddr();
            if (selectBrokerAddr != null) {
                this.mqClientInstance.getMQClientAPIImpl().cloneGroupOffset(selectBrokerAddr, str, str2, str3, z, this.timeoutMillis * 3);
            }
        }
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public BrokerStatsData viewBrokerStatsData(String str, String str2, String str3) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(str, str2, str3, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public Set<String> getClusterList(String str) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        return this.mqClientInstance.getMQClientAPIImpl().getClusterList(str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public ConsumeStatsList fetchConsumeStatsInBroker(String str, boolean z, long j) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
        return this.mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker(str, z, j);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public Set<String> getTopicClusterList(String str) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
        HashSet hashSet = new HashSet();
        ClusterInfo examineBrokerClusterInfo = examineBrokerClusterInfo();
        String brokerName = examineTopicRouteInfo(str).getBrokerDatas().get(0).getBrokerName();
        for (Map.Entry<String, Set<String>> entry : examineBrokerClusterInfo.getClusterAddrTable().entrySet()) {
            if (entry.getValue().contains(brokerName)) {
                hashSet.add(entry.getKey());
            }
        }
        return hashSet;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public SubscriptionGroupWrapper getAllSubscriptionGroup(String str, long j) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(str, j);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public SubscriptionGroupWrapper getUserSubscriptionGroup(String str, long j) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        SubscriptionGroupWrapper allSubscriptionGroup = this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(str, j);
        Iterator<Map.Entry<String, SubscriptionGroupConfig>> it = allSubscriptionGroup.getSubscriptionGroupTable().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, SubscriptionGroupConfig> next = it.next();
            if (MixAll.isSysConsumerGroup(next.getKey()) || SYSTEM_GROUP_SET.contains(next.getKey())) {
                it.remove();
            }
        }
        return allSubscriptionGroup;
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public TopicConfigSerializeWrapper getAllTopicConfig(String str, long j) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(str, j);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public TopicConfigSerializeWrapper getUserTopicConfig(String str, boolean z, long j) throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
        TopicConfigSerializeWrapper allTopicConfig = getAllTopicConfig(str, j);
        TopicList systemTopicListFromBroker = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(str, j);
        Iterator<Map.Entry<String, TopicConfig>> it = allTopicConfig.getTopicConfigTable().entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            if (systemTopicListFromBroker.getTopicList().contains(key) || (!z && (key.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || key.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) {
                it.remove();
            }
        }
        return allTopicConfig;
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public void createTopic(String str, String str2, int i) throws MQClientException {
        createTopic(str, str2, i, 0);
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public void createTopic(String str, String str2, int i, int i2) throws MQClientException {
        this.mqClientInstance.getMQAdminImpl().createTopic(str, str2, i, i2);
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public long searchOffset(MessageQueue messageQueue, long j) throws MQClientException {
        return this.mqClientInstance.getMQAdminImpl().searchOffset(messageQueue, j);
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public long maxOffset(MessageQueue messageQueue) throws MQClientException {
        return this.mqClientInstance.getMQAdminImpl().maxOffset(messageQueue);
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public long minOffset(MessageQueue messageQueue) throws MQClientException {
        return this.mqClientInstance.getMQAdminImpl().minOffset(messageQueue);
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public long earliestMsgStoreTime(MessageQueue messageQueue) throws MQClientException {
        return this.mqClientInstance.getMQAdminImpl().earliestMsgStoreTime(messageQueue);
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public MessageExt viewMessage(String str) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        return this.mqClientInstance.getMQAdminImpl().viewMessage(str);
    }

    @Override // org.apache.rocketmq.client.MQAdmin
    public QueryResult queryMessage(String str, String str2, int i, long j, long j2) throws MQClientException, InterruptedException {
        return this.mqClientInstance.getMQAdminImpl().queryMessage(str, str2, i, j, j2);
    }

    public QueryResult queryMessageByUniqKey(String str, String str2, int i, long j, long j2) throws MQClientException, InterruptedException {
        return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(str, str2, i, j, j2);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void updateConsumeOffset(String str, String str2, MessageQueue messageQueue, long j) throws RemotingException, InterruptedException, MQBrokerException {
        UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = new UpdateConsumerOffsetRequestHeader();
        updateConsumerOffsetRequestHeader.setConsumerGroup(str2);
        updateConsumerOffsetRequestHeader.setTopic(messageQueue.getTopic());
        updateConsumerOffsetRequestHeader.setQueueId(Integer.valueOf(messageQueue.getQueueId()));
        updateConsumerOffsetRequestHeader.setCommitOffset(Long.valueOf(j));
        this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(str, updateConsumerOffsetRequestHeader, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public void updateNameServerConfig(Properties properties, List<String> list) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException {
        this.mqClientInstance.getMQClientAPIImpl().updateNameServerConfig(properties, list, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public Map<String, Properties> getNameServerConfig(List<String> list) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException {
        return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(list, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public QueryConsumeQueueResponseBody queryConsumeQueue(String str, String str2, int i, long j, int i2, String str3) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
        return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(str, str2, i, j, i2, str3, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public boolean resumeCheckHalfMessage(String str) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(viewMessage(str).getStoreHost()), str, this.timeoutMillis);
    }

    @Override // org.apache.rocketmq.tools.admin.MQAdminExt
    public boolean resumeCheckHalfMessage(String str, String str2) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        MessageExt viewMessage = viewMessage(str, str2);
        if (viewMessage.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
            return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(viewMessage.getStoreHost()), str2, this.timeoutMillis);
        }
        return this.mqClientInstance.getMQClientAPIImpl().resumeCheckHalfMessage(RemotingUtil.socketAddress2String(viewMessage.getStoreHost()), ((MessageClientExt) viewMessage).getOffsetMsgId(), this.timeoutMillis);
    }

    static {
        SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_PRODUCER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP);
        SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
    }
}
