package org.apache.rocketmq.tools.command.logicalqueue;

import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/tools/command/logicalqueue/UpdateTopicLogicalQueueNumCommand.class */
public class UpdateTopicLogicalQueueNumCommand implements SubCommand {
    private static final Logger log = LoggerFactory.getLogger("STDOUT");

    @Override // org.apache.rocketmq.tools.command.SubCommand
    public String commandName() {
        return "updateTopicLogicalQueueNum";
    }

    @Override // org.apache.rocketmq.tools.command.SubCommand
    public String commandDesc() {
        return "change logical queue num (increase or decrease) of a topic.";
    }

    @Override // org.apache.rocketmq.tools.command.SubCommand
    public Options buildCommandlineOptions(Options options) {
        Option option = new Option("t", "topic", true, "topic name.");
        option.setRequired(true);
        options.addOption(option);
        Option option2 = new Option("c", "clusterName", true, "cluster name.");
        option2.setRequired(true);
        options.addOption(option2);
        Option option3 = new Option("n", "num", true, "logical queue num.");
        option3.setRequired(true);
        options.addOption(option3);
        return options;
    }

    @Override // org.apache.rocketmq.tools.command.SubCommand
    public void execute(CommandLine commandLine, Options options, RPCHook rPCHook) throws SubCommandException {
        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rPCHook);
        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
        try {
            try {
                defaultMQAdminExt.start();
                execute(defaultMQAdminExt, commandLine.getOptionValue("c").trim(), commandLine.getOptionValue("t").trim(), Integer.parseUnsignedInt(commandLine.getOptionValue("n")));
                defaultMQAdminExt.shutdown();
            } catch (Exception e) {
                throw new SubCommandException(getClass().getSimpleName() + " command failed", e);
            }
        } catch (Throwable th) {
            defaultMQAdminExt.shutdown();
            throw th;
        }
    }

    public void execute(DefaultMQAdminExt defaultMQAdminExt, String str, String str2, int i) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException, SubCommandException {
        List<String> list = (List) CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, str).stream().sorted().collect(Collectors.toList());
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(list.size());
        TreeMap newTreeMap = Maps.newTreeMap();
        HashMap newHashMap = Maps.newHashMap();
        for (String str3 : list) {
            TopicConfig examineTopicConfig = defaultMQAdminExt.examineTopicConfig(str3, str2);
            if (examineTopicConfig == null) {
                log.info("examineTopicConfig brokerAddr={} topic={} not exist, skip!", str3, str2);
            } else {
                newHashMapWithExpectedSize.put(str3, examineTopicConfig);
                LogicalQueuesInfo queryTopicLogicalQueueMapping = defaultMQAdminExt.queryTopicLogicalQueueMapping(str3, str2);
                if (queryTopicLogicalQueueMapping == null) {
                    throw new SubCommandException(String.format(Locale.ENGLISH, "broker=%s topic=%s logical queue not enabled", str3, str2));
                }
                newHashMap.put(str3, queryTopicLogicalQueueMapping);
                queryTopicLogicalQueueMapping.values().stream().flatMap((v0) -> {
                    return v0.stream();
                }).forEach(logicalQueueRouteData -> {
                    List list2 = (List) newTreeMap.computeIfAbsent(Integer.valueOf(logicalQueueRouteData.getLogicalQueueIndex()), num -> {
                        return Lists.newArrayListWithExpectedSize(1);
                    });
                    int binarySearch = Collections.binarySearch(list2, logicalQueueRouteData, Comparator.comparingLong((v0) -> {
                        return v0.getLogicalQueueDelta();
                    }).thenComparing((v0) -> {
                        return v0.getMessageQueue();
                    }).thenComparingInt((v0) -> {
                        return v0.getStateOrdinal();
                    }));
                    if (binarySearch < 0) {
                        binarySearch = (-binarySearch) - 1;
                    }
                    list2.add(binarySearch, logicalQueueRouteData);
                });
            }
        }
        int count = (int) newTreeMap.values().stream().filter(list2 -> {
            return list2.stream().anyMatch((v0) -> {
                return v0.isWritable();
            });
        }).count();
        if (count == i) {
            log.info("logical queue num not changed!");
        } else if (count < i) {
            increaseLogicalQueueNum(defaultMQAdminExt, newHashMap, newTreeMap, newHashMapWithExpectedSize, count, i);
        } else {
            decreaseLogicalQueueNum(defaultMQAdminExt, newTreeMap, count, i);
        }
    }

    private void increaseLogicalQueueNum(DefaultMQAdminExt defaultMQAdminExt, Map<String, LogicalQueuesInfo> map, NavigableMap<Integer, List<LogicalQueueRouteData>> navigableMap, Map<String, TopicConfig> map2, int i, int i2) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        int i3 = i;
        String str = (String) map2.values().stream().findAny().map((v0) -> {
            return v0.getTopicName();
        }).get();
        for (Map.Entry<String, TopicConfig> entry : map2.entrySet()) {
            String key = entry.getKey();
            TopicConfig value = entry.getValue();
            LogicalQueuesInfo orDefault = map.getOrDefault(key, new LogicalQueuesInfo());
            ImmutableListMultimap index = Multimaps.index(orDefault.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).iterator(), (v0) -> {
                return v0.getQueueId();
            });
            int writeQueueNums = value.getWriteQueueNums();
            for (int i4 = 0; i4 < writeQueueNums; i4++) {
                if (!index.get(Integer.valueOf(i4)).stream().anyMatch((v0) -> {
                    return v0.isWritable();
                })) {
                    int i5 = i3;
                    try {
                        LogicalQueueRouteData reuseTopicLogicalQueue = defaultMQAdminExt.reuseTopicLogicalQueue(key, str, i4, i5, MessageQueueRouteState.Normal);
                        log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{key, Integer.valueOf(i4), Integer.valueOf(i5)});
                        i3++;
                        if (i3 >= i2) {
                            return;
                        }
                        ((List) navigableMap.computeIfAbsent(Integer.valueOf(i5), num -> {
                            return Lists.newArrayListWithExpectedSize(1);
                        })).add(reuseTopicLogicalQueue);
                        ((List) orDefault.computeIfAbsent(Integer.valueOf(i5), num2 -> {
                            return Lists.newArrayListWithExpectedSize(1);
                        })).add(reuseTopicLogicalQueue);
                    } finally {
                        log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{key, Integer.valueOf(i4), Integer.valueOf(i5)});
                    }
                }
            }
        }
        for (Map.Entry<Integer, List<LogicalQueueRouteData>> entry2 : navigableMap.entrySet()) {
            List<LogicalQueueRouteData> value2 = entry2.getValue();
            if (value2.size() != 0 && !value2.stream().anyMatch((v0) -> {
                return v0.isWritable();
            })) {
                int intValue = entry2.getKey().intValue();
                LogicalQueueRouteData logicalQueueRouteData = value2.get(value2.size() - 1);
                String brokerAddr = logicalQueueRouteData.getBrokerAddr();
                List list = (List) map.get(brokerAddr).get(Integer.valueOf(intValue));
                if (logicalQueueRouteData.isExpired()) {
                    int queueId = logicalQueueRouteData.getQueueId();
                    try {
                        LogicalQueueRouteData reuseTopicLogicalQueue2 = defaultMQAdminExt.reuseTopicLogicalQueue(brokerAddr, str, queueId, intValue, MessageQueueRouteState.Normal);
                        log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{brokerAddr, Integer.valueOf(queueId), Integer.valueOf(intValue)});
                        value2.add(reuseTopicLogicalQueue2);
                        list.add(reuseTopicLogicalQueue2);
                    } finally {
                        log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{brokerAddr, Integer.valueOf(queueId), Integer.valueOf(intValue)});
                    }
                } else {
                    int i6 = -1;
                    try {
                        LogicalQueueRouteData createMessageQueueForLogicalQueue = defaultMQAdminExt.createMessageQueueForLogicalQueue(brokerAddr, str, intValue, MessageQueueRouteState.Normal);
                        i6 = createMessageQueueForLogicalQueue.getQueueId();
                        log.info("updateTopicLogicalQueueMapping create message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{brokerAddr, Integer.valueOf(i6), Integer.valueOf(intValue)});
                        value2.add(createMessageQueueForLogicalQueue);
                        list.add(createMessageQueueForLogicalQueue);
                        map2.get(brokerAddr).setWriteQueueNums(i6 + 1);
                    } finally {
                        log.info("updateTopicLogicalQueueMapping create message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{brokerAddr, Integer.valueOf(i6), Integer.valueOf(intValue)});
                    }
                }
                i3++;
                if (i3 >= i2) {
                    return;
                }
            }
        }
        for (Map.Entry<String, LogicalQueuesInfo> entry3 : map.entrySet()) {
            String key2 = entry3.getKey();
            for (LogicalQueueRouteData logicalQueueRouteData2 : entry3.getValue().values().stream().flatMap((v0) -> {
                return v0.stream();
            }).filter((v0) -> {
                return v0.isExpired();
            }).sorted(Comparator.comparingInt((v0) -> {
                return v0.getLogicalQueueIndex();
            }).thenComparingInt((v0) -> {
                return v0.getQueueId();
            })).limit(i2 - i3)) {
                try {
                    logicalQueueRouteData2.copyFrom(defaultMQAdminExt.reuseTopicLogicalQueue(key2, str, logicalQueueRouteData2.getQueueId(), logicalQueueRouteData2.getLogicalQueueIndex(), MessageQueueRouteState.Normal));
                    log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{key2, Integer.valueOf(logicalQueueRouteData2.getQueueId()), Integer.valueOf(logicalQueueRouteData2.getLogicalQueueIndex())});
                    ((List) navigableMap.get(Integer.valueOf(logicalQueueRouteData2.getLogicalQueueIndex()))).stream().filter((v0) -> {
                        return v0.isExpired();
                    }).filter(logicalQueueRouteData3 -> {
                        return Objects.equals(key2, logicalQueueRouteData3.getBrokerAddr()) && logicalQueueRouteData2.getQueueId() == logicalQueueRouteData3.getQueueId() && logicalQueueRouteData2.getLogicalQueueIndex() == logicalQueueRouteData3.getLogicalQueueIndex();
                    }).forEach(logicalQueueRouteData4 -> {
                        logicalQueueRouteData4.copyFrom(logicalQueueRouteData2);
                    });
                    i3++;
                    if (i3 >= i2) {
                        return;
                    }
                } finally {
                    log.info("updateTopicLogicalQueueMapping reuse expired message queue from sealing logical queue brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{key2, Integer.valueOf(logicalQueueRouteData2.getQueueId()), Integer.valueOf(logicalQueueRouteData2.getLogicalQueueIndex())});
                }
            }
        }
        while (i3 < i2) {
            Map.Entry<String, LogicalQueuesInfo> entry4 = map.entrySet().stream().min(Comparator.comparingInt(entry5 -> {
                return ((LogicalQueuesInfo) entry5.getValue()).values().stream().flatMapToInt(list2 -> {
                    return IntStream.of(list2.size());
                }).sum();
            })).get();
            String key3 = entry4.getKey();
            int i7 = i3;
            int i8 = -1;
            try {
                LogicalQueueRouteData createMessageQueueForLogicalQueue2 = defaultMQAdminExt.createMessageQueueForLogicalQueue(key3, str, i7, MessageQueueRouteState.Normal);
                i8 = createMessageQueueForLogicalQueue2.getQueueId();
                log.info("updateTopicLogicalQueueMapping create message queue from fresh brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{key3, Integer.valueOf(i8), Integer.valueOf(i7)});
                entry4.getValue().put(Integer.valueOf(i7), Lists.newArrayList(new LogicalQueueRouteData[]{createMessageQueueForLogicalQueue2}));
                i3++;
            } finally {
                log.info("updateTopicLogicalQueueMapping create message queue from fresh brokerAddr={} queueId={} logicalQueueIdx={}", new Object[]{key3, Integer.valueOf(i8), Integer.valueOf(i7)});
            }
        }
    }

    private void decreaseLogicalQueueNum(DefaultMQAdminExt defaultMQAdminExt, NavigableMap<Integer, List<LogicalQueueRouteData>> navigableMap, int i, int i2) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException, SubCommandException {
        Map.Entry<Integer, List<LogicalQueueRouteData>> lastEntry = navigableMap.lastEntry();
        int i3 = i;
        while (i3 > i2) {
            boolean z = false;
            for (LogicalQueueRouteData logicalQueueRouteData : lastEntry.getValue()) {
                if (logicalQueueRouteData.isWritable()) {
                    z = true;
                    LogicalQueueRouteData logicalQueueRouteData2 = logicalQueueRouteData;
                    try {
                        logicalQueueRouteData2 = defaultMQAdminExt.sealTopicLogicalQueue(logicalQueueRouteData.getBrokerAddr(), logicalQueueRouteData);
                        log.info("seal message queue: {}", logicalQueueRouteData2);
                    } catch (Throwable th) {
                        log.info("seal message queue: {}", logicalQueueRouteData2);
                        throw th;
                    }
                }
            }
            if (z) {
                i3--;
            }
            lastEntry = navigableMap.lowerEntry(lastEntry.getKey());
            if (lastEntry == null) {
                throw new SubCommandException(String.format(Locale.ENGLISH, "oldLogicalQueueNum=%d newLogicalQueueNum=%d curLogicalQueueNum=%d but can not find lowerEntry, unexpected situation", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)));
            }
        }
    }
}
