package org.apache.rocketmq.test.smoke;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendResultForLogicalQueue;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
/* loaded from: input_file:org/apache/rocketmq/test/smoke/LogicalQueueIT.class */
public class LogicalQueueIT {
    public static String nsAddr;
    private static String broker1Name;
    private static String broker2Name;
    private static String clusterName;
    private static int brokerNum;
    private static final int QUEUE_NUMBERS = 8;
    private static NamesrvController namesrvController;
    private static BrokerController brokerController1;
    private static BrokerController brokerController2;
    private static Map<String, BrokerController> brokerControllerMap;
    private static DefaultMQProducer producer;
    private static DefaultMQPullConsumer consumer;
    private static DefaultMQAdminExt mqAdminExt;
    private static final String placeholderTopic = "placeholder";
    private static final int MSG_SENT_TIMES = 3;
    private static final int COMMIT_LOG_FILE_SIZE = 524288;
    private static final Logger logger = LoggerFactory.getLogger(LogicalQueueIT.class);
    private static final List<Object> mqClients = new ArrayList();
    private static volatile String topic = null;

    /* renamed from: org.apache.rocketmq.test.smoke.LogicalQueueIT$7, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/test/smoke/LogicalQueueIT$7.class */
    static /* synthetic */ class AnonymousClass7 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus = new int[PullStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[PullStatus.NO_NEW_MSG.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[PullStatus.OFFSET_ILLEGAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[PullStatus.FOUND.ordinal()] = LogicalQueueIT.MSG_SENT_TIMES;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[PullStatus.NO_MATCHED_MSG.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        System.setProperty("rocketmq.remoting.version", Integer.toString(MQVersion.CURRENT_VERSION));
        namesrvController = IntegrationTestBase.createAndStartNamesrv();
        nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
        int i = IntegrationTestBase.COMMIT_LOG_SIZE;
        IntegrationTestBase.COMMIT_LOG_SIZE = COMMIT_LOG_FILE_SIZE;
        brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
        brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr);
        IntegrationTestBase.COMMIT_LOG_SIZE = i;
        clusterName = brokerController1.getBrokerConfig().getBrokerClusterName();
        broker1Name = brokerController1.getBrokerConfig().getBrokerName();
        broker2Name = brokerController2.getBrokerConfig().getBrokerName();
        brokerNum = 2;
        brokerControllerMap = (Map) ImmutableList.of(brokerController1, brokerController2).stream().collect(Collectors.toMap(brokerController -> {
            return brokerController.getBrokerConfig().getBrokerName();
        }, Function.identity()));
        BaseConf.waitBrokerRegistered(nsAddr, clusterName);
        producer = new DefaultMQProducer(MQRandomUtils.getRandomConsumerGroup());
        mqClients.add(producer);
        producer.setNamesrvAddr(nsAddr);
        producer.setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
        producer.setSendMsgTimeout(1000);
        producer.start();
        consumer = new DefaultMQPullConsumer(BaseConf.initConsumerGroup());
        mqClients.add(consumer);
        consumer.setNamesrvAddr(nsAddr);
        consumer.setConsumerPullTimeoutMillis(1000L);
        consumer.start();
        mqAdminExt = new DefaultMQAdminExt(1000L);
        mqClients.add(mqAdminExt);
        mqAdminExt.setNamesrvAddr(nsAddr);
        mqAdminExt.start();
        mqAdminExt.createTopic(clusterName, placeholderTopic, 1);
    }

    @AfterClass
    public static void afterClass() {
        BaseConf.shutdown(mqClients);
        brokerControllerMap.forEach((str, brokerController) -> {
            brokerController.shutdown();
        });
        Optional.ofNullable(namesrvController).ifPresent(namesrvController2 -> {
            ForkJoinPool commonPool = ForkJoinPool.commonPool();
            namesrvController2.getClass();
            commonPool.execute(namesrvController2::shutdown);
        });
    }

    @Before
    public void setUp() throws Exception {
        topic = "tt-" + MQRandomUtils.getRandomTopic();
        logger.info("use topic: {}", topic);
        mqAdminExt.createTopic(clusterName, topic, QUEUE_NUMBERS);
        Assertions.assertThat(mqAdminExt.examineTopicRouteInfo(topic).getBrokerDatas()).hasSize(brokerNum);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!mqAdminExt.examineTopicStats(topic).getOffsetTable().isEmpty());
        });
        consumer.setRegisterTopics(Collections.singleton(topic));
        new UpdateTopicLogicalQueueMappingCommand().execute(mqAdminExt, topic, (Collection) brokerControllerMap.values().stream().map((v0) -> {
            return v0.getBrokerAddr();
        }).collect(Collectors.toSet()));
    }

    private static String getCurrentMethodName() {
        return Thread.currentThread().getStackTrace()[2].getMethodName();
    }

    @Test
    public void test001_SendPullSync() throws Exception {
        String currentMethodName = getCurrentMethodName();
        List<MessageQueue> fetchPublishMessageQueues = producer.fetchPublishMessageQueues(topic);
        Assertions.assertThat(fetchPublishMessageQueues).hasSize(brokerNum * QUEUE_NUMBERS);
        Set set = (Set) IntStream.range(0, brokerNum * QUEUE_NUMBERS).boxed().collect(Collectors.toSet());
        for (MessageQueue messageQueue : fetchPublishMessageQueues) {
            Assertions.assertThat(messageQueue.getBrokerName()).isEqualTo("__logical_queue_broker__");
            Assertions.assertThat(set.remove(Integer.valueOf(messageQueue.getQueueId()))).isTrue();
            for (int i = 0; i < MSG_SENT_TIMES; i++) {
                SendResult send = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i)).getBytes(StandardCharsets.UTF_8)), messageQueue);
                Assertions.assertThat(send.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
                Assertions.assertThat(send.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
            }
        }
        Assertions.assertThat(set).isEmpty();
        List<MessageQueue> list = (List) consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
        Assertions.assertThat(list).hasSize(brokerNum * QUEUE_NUMBERS);
        list.sort(Comparator.comparingInt((v0) -> {
            return v0.getQueueId();
        }));
        set.addAll((Collection) IntStream.range(0, brokerNum * QUEUE_NUMBERS).boxed().collect(Collectors.toSet()));
        for (MessageQueue messageQueue2 : list) {
            Assertions.assertThat(messageQueue2.getBrokerName()).isEqualTo("__logical_queue_broker__");
            Assertions.assertThat(set.remove(Integer.valueOf(messageQueue2.getQueueId()))).isTrue();
            PullResult pull = consumer.pull(messageQueue2, "*", mqAdminExt.minOffset(messageQueue2), 10);
            Assertions.assertThat(pull.getPullStatus()).isEqualTo(PullStatus.FOUND);
            Assertions.assertThat(pull.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
            long j = -1;
            for (int i2 = 0; i2 < MSG_SENT_TIMES; i2++) {
                MessageExt messageExt = (MessageExt) pull.getMsgFoundList().get(i2);
                Assertions.assertThat(messageExt.getBrokerName()).isEqualTo("__logical_queue_broker__");
                Assertions.assertThat(messageExt.getQueueId()).isEqualTo(messageQueue2.getQueueId());
                Assertions.assertThat(new String(messageExt.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", currentMethodName, Integer.valueOf(messageQueue2.getQueueId()), Integer.valueOf(i2)));
                if (i2 > 0) {
                    Assertions.assertThat(messageExt.getQueueOffset()).isEqualTo(j + i2);
                } else {
                    j = messageExt.getQueueOffset();
                }
            }
            Assertions.assertThat(maxOffsetUncommitted(messageQueue2)).isEqualTo(j + 3);
        }
        Assertions.assertThat(set).isEmpty();
    }

    @Test
    public void test002_SendPullAsync() throws Exception {
        String currentMethodName = getCurrentMethodName();
        for (MessageQueue messageQueue : producer.fetchPublishMessageQueues(topic)) {
            for (int i = 0; i < MSG_SENT_TIMES; i++) {
                final CompletableFuture completableFuture = new CompletableFuture();
                producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i)).getBytes(StandardCharsets.UTF_8)), messageQueue, new SendCallback() { // from class: org.apache.rocketmq.test.smoke.LogicalQueueIT.1
                    public void onSuccess(SendResult sendResult) {
                        completableFuture.complete(sendResult);
                    }

                    public void onException(Throwable th) {
                        completableFuture.completeExceptionally(th);
                    }
                });
                SendResult sendResult = (SendResult) completableFuture.get();
                Assertions.assertThat(sendResult.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
                Assertions.assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
            }
        }
        for (MessageQueue messageQueue2 : (List) consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList())) {
            long minOffset = mqAdminExt.minOffset(messageQueue2);
            final CompletableFuture completableFuture2 = new CompletableFuture();
            consumer.pull(messageQueue2, "*", minOffset, 10, new PullCallback() { // from class: org.apache.rocketmq.test.smoke.LogicalQueueIT.2
                public void onSuccess(PullResult pullResult) {
                    completableFuture2.complete(pullResult);
                }

                public void onException(Throwable th) {
                    completableFuture2.completeExceptionally(th);
                }
            });
            PullResult pullResult = (PullResult) completableFuture2.get();
            Assertions.assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
            Assertions.assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
            long j = -1;
            Iterator it = pullResult.getMsgFoundList().iterator();
            for (int i2 = 0; i2 < MSG_SENT_TIMES; i2++) {
                MessageExt messageExt = (MessageExt) it.next();
                Assertions.assertThat(messageExt.getBrokerName()).isEqualTo("__logical_queue_broker__");
                Assertions.assertThat(messageExt.getQueueId()).isEqualTo(messageQueue2.getQueueId());
                Assertions.assertThat(new String(messageExt.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-async-%d-%d", currentMethodName, Integer.valueOf(messageQueue2.getQueueId()), Integer.valueOf(i2)));
                if (i2 > 0) {
                    Assertions.assertThat(messageExt.getQueueOffset()).isEqualTo(j + i2);
                } else {
                    j = messageExt.getQueueOffset();
                }
            }
        }
    }

    @Test
    public void test003_MigrateOnceWithoutData() throws Exception {
        String currentMethodName = getCurrentMethodName();
        List list = (List) mqAdminExt.examineTopicRouteInfo(topic).getLogicalQueuesInfo().get(1);
        LogicalQueueRouteData logicalQueueRouteData = (LogicalQueueRouteData) list.get(list.size() - 1);
        String str = logicalQueueRouteData.getBrokerName().equals(broker1Name) ? broker2Name : broker1Name;
        MessageQueue messageQueue = new MessageQueue(topic, "__logical_queue_broker__", 1);
        new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, 1, str, (Long) null);
        TopicRouteData examineTopicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
        Assertions.assertThat(examineTopicRouteInfo.getLogicalQueuesInfo()).isNotNull();
        for (Map.Entry entry : examineTopicRouteInfo.getLogicalQueuesInfo().entrySet()) {
            List list2 = (List) entry.getValue();
            if (((Integer) entry.getKey()).intValue() == 1) {
                Assertions.assertThat(list2).hasSize(list.size() + 1);
                LogicalQueueRouteData logicalQueueRouteData2 = (LogicalQueueRouteData) list2.get(list2.size() - 2);
                Assertions.assertThat(logicalQueueRouteData2.getMessageQueue()).isEqualTo(logicalQueueRouteData.getMessageQueue());
                Assertions.assertThat(logicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L);
                Assertions.assertThat(logicalQueueRouteData2.getMessagesCount()).isEqualTo(0L);
                Assertions.assertThat(logicalQueueRouteData2.isWritable()).isFalse();
                Assertions.assertThat(logicalQueueRouteData2.isReadable()).isFalse();
                Assertions.assertThat(logicalQueueRouteData2.isExpired()).isTrue();
                Assertions.assertThat(logicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L);
                LogicalQueueRouteData logicalQueueRouteData3 = (LogicalQueueRouteData) list2.get(list2.size() - 1);
                Assertions.assertThat(logicalQueueRouteData3.getBrokerName()).isEqualTo(str);
                Assertions.assertThat(logicalQueueRouteData3.getOffsetMax()).isLessThan(0L);
                Assertions.assertThat(logicalQueueRouteData3.isWritable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData3.isReadable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData3.isExpired()).isFalse();
                Assertions.assertThat(logicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(0L);
            } else {
                Assertions.assertThat(list2).hasSize(1);
                LogicalQueueRouteData logicalQueueRouteData4 = (LogicalQueueRouteData) list2.get(0);
                Assertions.assertThat(logicalQueueRouteData4.getOffsetMax()).isLessThan(0L);
                Assertions.assertThat(logicalQueueRouteData4.isWritable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData4.isReadable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData4.isExpired()).isFalse();
                Assertions.assertThat(logicalQueueRouteData4.getLogicalQueueDelta()).isEqualTo(0L);
            }
        }
        List list3 = (List) consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
        Assertions.assertThat(list3).hasSize(brokerNum * QUEUE_NUMBERS);
        Iterator it = list3.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(mqAdminExt.minOffset((MessageQueue) it.next())).isEqualTo(0L);
        }
        for (int i = 0; i < MSG_SENT_TIMES; i++) {
            SendResultForLogicalQueue send = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i)).getBytes(StandardCharsets.UTF_8)), messageQueue);
            Assertions.assertThat(send.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
            Assertions.assertThat(send.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
            SendResultForLogicalQueue sendResultForLogicalQueue = send;
            Assertions.assertThat(sendResultForLogicalQueue.getOrigBrokerName()).isEqualTo(str);
            Assertions.assertThat(sendResultForLogicalQueue.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
        }
        for (int i2 = 0; i2 < MSG_SENT_TIMES; i2++) {
            final CompletableFuture completableFuture = new CompletableFuture();
            producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i2)).getBytes(StandardCharsets.UTF_8)), messageQueue, new SendCallback() { // from class: org.apache.rocketmq.test.smoke.LogicalQueueIT.3
                public void onSuccess(SendResult sendResult) {
                    completableFuture.complete(sendResult);
                }

                public void onException(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            });
            SendResultForLogicalQueue sendResultForLogicalQueue2 = (SendResult) completableFuture.get();
            Assertions.assertThat(sendResultForLogicalQueue2.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
            Assertions.assertThat(sendResultForLogicalQueue2.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
            SendResultForLogicalQueue sendResultForLogicalQueue3 = sendResultForLogicalQueue2;
            Assertions.assertThat(sendResultForLogicalQueue3.getOrigBrokerName()).isEqualTo(str);
            Assertions.assertThat(sendResultForLogicalQueue3.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
        }
        Assertions.assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(6L);
        Awaitility.waitAtMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(mqAdminExt.maxOffset(messageQueue) == 6);
        });
        PullResult pull = consumer.pull(messageQueue, "*", 0L, 6);
        Assertions.assertThat(pull.getPullStatus()).isEqualTo(PullStatus.FOUND);
        Assertions.assertThat(pull.getMinOffset()).isEqualTo(0L);
        Assertions.assertThat(pull.getMaxOffset()).isEqualTo(6L);
        Assertions.assertThat(pull.getNextBeginOffset()).isEqualTo(6L);
        Assertions.assertThat(pull.getMsgFoundList()).hasSize(6);
        Iterator it2 = pull.getMsgFoundList().iterator();
        long j = 0;
        for (String str2 : new String[]{"sync", "async"}) {
            for (int i3 = 0; i3 < MSG_SENT_TIMES; i3++) {
                MessageExt messageExt = (MessageExt) it2.next();
                Assertions.assertThat(messageExt.getBrokerName()).isEqualTo("__logical_queue_broker__");
                Assertions.assertThat(messageExt.getQueueId()).isEqualTo(messageQueue.getQueueId());
                Assertions.assertThat(new String(messageExt.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%s-%d-%d", currentMethodName, str2, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i3)));
                Assertions.assertThat(messageExt.getQueueOffset()).isEqualTo(j);
                j++;
            }
        }
        long nextBeginOffset = pull.getNextBeginOffset();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        consumer.pull(messageQueue, "*", nextBeginOffset, 10, new PullCallback() { // from class: org.apache.rocketmq.test.smoke.LogicalQueueIT.4
            public void onSuccess(PullResult pullResult) {
                completableFuture2.complete(pullResult);
            }

            public void onException(Throwable th) {
                completableFuture2.completeExceptionally(th);
            }
        });
        PullResult pullResult = (PullResult) completableFuture2.get();
        Assertions.assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
        Assertions.assertThat(pullResult.getMinOffset()).isEqualTo(0L);
        Assertions.assertThat(pullResult.getMaxOffset()).isEqualTo(6L);
        Assertions.assertThat(pullResult.getNextBeginOffset()).isEqualTo(6L);
        Assertions.assertThat(pullResult.getMsgFoundList()).isNull();
    }

    @Test
    public void test004_MigrateOnceWithData() throws Exception {
        String currentMethodName = getCurrentMethodName();
        List list = (List) mqAdminExt.examineTopicRouteInfo(topic).getLogicalQueuesInfo().get(1);
        LogicalQueueRouteData logicalQueueRouteData = (LogicalQueueRouteData) list.get(list.size() - 1);
        String str = logicalQueueRouteData.getBrokerName().equals(broker1Name) ? broker2Name : broker1Name;
        MessageQueue messageQueue = new MessageQueue(topic, "__logical_queue_broker__", 1);
        for (int i = 0; i < MSG_SENT_TIMES; i++) {
            SendResult send = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-sync-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i)).getBytes(StandardCharsets.UTF_8)), messageQueue);
            Assertions.assertThat(send.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
            Assertions.assertThat(send.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
        }
        Assertions.assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(3L);
        Awaitility.waitAtMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(mqAdminExt.maxOffset(messageQueue) == 3);
        });
        long j = 0;
        PullResult pull = consumer.pull(messageQueue, "*", 0L, 6);
        Assertions.assertThat(pull.getPullStatus()).isEqualTo(PullStatus.FOUND);
        Assertions.assertThat(pull.getMinOffset()).isEqualTo(0L);
        Assertions.assertThat(pull.getMaxOffset()).isEqualTo(3L);
        Assertions.assertThat(pull.getNextBeginOffset()).isEqualTo(3L);
        Assertions.assertThat(pull.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
        Iterator it = pull.getMsgFoundList().iterator();
        for (int i2 = 0; i2 < MSG_SENT_TIMES; i2++) {
            MessageExt messageExt = (MessageExt) it.next();
            Assertions.assertThat(messageExt.getBrokerName()).isEqualTo("__logical_queue_broker__");
            Assertions.assertThat(messageExt.getQueueId()).isEqualTo(messageQueue.getQueueId());
            Assertions.assertThat(new String(messageExt.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i2)));
            Assertions.assertThat(messageExt.getQueueOffset()).isEqualTo(j);
            j++;
        }
        new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, 1, str, (Long) null);
        TopicRouteData examineTopicRouteInfo = mqAdminExt.examineTopicRouteInfo(topic);
        Assertions.assertThat(examineTopicRouteInfo.getLogicalQueuesInfo()).isNotNull();
        for (Map.Entry entry : examineTopicRouteInfo.getLogicalQueuesInfo().entrySet()) {
            List list2 = (List) entry.getValue();
            if (((Integer) entry.getKey()).intValue() == 1) {
                Assertions.assertThat(list2).hasSize(list.size() + 1);
                LogicalQueueRouteData logicalQueueRouteData2 = (LogicalQueueRouteData) list2.get(list2.size() - 2);
                Assertions.assertThat(logicalQueueRouteData2.getMessageQueue()).isEqualTo(logicalQueueRouteData.getMessageQueue());
                Assertions.assertThat(logicalQueueRouteData2.getOffsetMax()).isGreaterThanOrEqualTo(0L);
                Assertions.assertThat(logicalQueueRouteData2.getMessagesCount()).isEqualTo(3L);
                Assertions.assertThat(logicalQueueRouteData2.isWritable()).isFalse();
                Assertions.assertThat(logicalQueueRouteData2.isReadable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData2.isExpired()).isFalse();
                Assertions.assertThat(logicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L);
                LogicalQueueRouteData logicalQueueRouteData3 = (LogicalQueueRouteData) list2.get(list2.size() - 1);
                Assertions.assertThat(logicalQueueRouteData3.getBrokerName()).isEqualTo(str);
                Assertions.assertThat(logicalQueueRouteData3.getOffsetMax()).isLessThan(0L);
                Assertions.assertThat(logicalQueueRouteData3.isWritable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData3.isReadable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData3.isExpired()).isFalse();
                Assertions.assertThat(logicalQueueRouteData3.getLogicalQueueDelta()).isEqualTo(3L);
            } else {
                Assertions.assertThat(list2).hasSize(1);
                LogicalQueueRouteData logicalQueueRouteData4 = (LogicalQueueRouteData) list2.get(0);
                Assertions.assertThat(logicalQueueRouteData4.getOffsetMax()).isLessThan(0L);
                Assertions.assertThat(logicalQueueRouteData4.isWritable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData4.isReadable()).isTrue();
                Assertions.assertThat(logicalQueueRouteData4.isExpired()).isFalse();
                Assertions.assertThat(logicalQueueRouteData4.getLogicalQueueDelta()).isEqualTo(0L);
            }
        }
        Assertions.assertThat(messageQueue).isNotNull();
        List list3 = (List) consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
        Assertions.assertThat(list3).hasSize(brokerNum * QUEUE_NUMBERS);
        Iterator it2 = list3.iterator();
        while (it2.hasNext()) {
            Assertions.assertThat(mqAdminExt.minOffset((MessageQueue) it2.next())).isEqualTo(0L);
        }
        for (int i3 = 0; i3 < MSG_SENT_TIMES; i3++) {
            final CompletableFuture completableFuture = new CompletableFuture();
            producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-async-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i3)).getBytes(StandardCharsets.UTF_8)), messageQueue, new SendCallback() { // from class: org.apache.rocketmq.test.smoke.LogicalQueueIT.5
                public void onSuccess(SendResult sendResult) {
                    completableFuture.complete(sendResult);
                }

                public void onException(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            });
            SendResultForLogicalQueue sendResultForLogicalQueue = (SendResult) completableFuture.get();
            Assertions.assertThat(sendResultForLogicalQueue.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
            Assertions.assertThat(sendResultForLogicalQueue.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
            SendResultForLogicalQueue sendResultForLogicalQueue2 = sendResultForLogicalQueue;
            Assertions.assertThat(sendResultForLogicalQueue2.getOrigBrokerName()).isEqualTo(str);
            Assertions.assertThat(sendResultForLogicalQueue2.getOrigQueueId()).isEqualTo(QUEUE_NUMBERS);
        }
        Assertions.assertThat(maxOffsetUncommitted(messageQueue)).isEqualTo(6L);
        Awaitility.waitAtMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(mqAdminExt.maxOffset(messageQueue) == 6);
        });
        long j2 = 0;
        PullResult pull2 = consumer.pull(messageQueue, "*", 0L, 6);
        Assertions.assertThat(pull2.getPullStatus()).isEqualTo(PullStatus.FOUND);
        Assertions.assertThat(pull2.getMinOffset()).isEqualTo(0L);
        Assertions.assertThat(pull2.getMaxOffset()).isEqualTo(3L);
        Assertions.assertThat(pull2.getNextBeginOffset()).isEqualTo(3L);
        Assertions.assertThat(pull2.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
        Iterator it3 = pull2.getMsgFoundList().iterator();
        for (int i4 = 0; i4 < MSG_SENT_TIMES; i4++) {
            MessageExt messageExt2 = (MessageExt) it3.next();
            Assertions.assertThat(messageExt2.getBrokerName()).isEqualTo("__logical_queue_broker__");
            Assertions.assertThat(messageExt2.getQueueId()).isEqualTo(messageQueue.getQueueId());
            Assertions.assertThat(new String(messageExt2.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-sync-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i4)));
            Assertions.assertThat(messageExt2.getQueueOffset()).isEqualTo(j2);
            j2++;
        }
        long nextBeginOffset = pull2.getNextBeginOffset();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        consumer.pull(messageQueue, "*", nextBeginOffset, 6, new PullCallback() { // from class: org.apache.rocketmq.test.smoke.LogicalQueueIT.6
            public void onSuccess(PullResult pullResult) {
                completableFuture2.complete(pullResult);
            }

            public void onException(Throwable th) {
                completableFuture2.completeExceptionally(th);
            }
        });
        PullResult pullResult = (PullResult) completableFuture2.get();
        Assertions.assertThat(pullResult.getPullStatus()).isEqualTo(PullStatus.FOUND);
        Assertions.assertThat(pullResult.getMinOffset()).isEqualTo(3L);
        Assertions.assertThat(pullResult.getMaxOffset()).isEqualTo(6L);
        Assertions.assertThat(pullResult.getNextBeginOffset()).isEqualTo(6L);
        Assertions.assertThat(pullResult.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
        Iterator it4 = pullResult.getMsgFoundList().iterator();
        for (int i5 = 0; i5 < MSG_SENT_TIMES; i5++) {
            MessageExt messageExt3 = (MessageExt) it4.next();
            Assertions.assertThat(messageExt3.getBrokerName()).isEqualTo("__logical_queue_broker__");
            Assertions.assertThat(messageExt3.getQueueId()).isEqualTo(messageQueue.getQueueId());
            Assertions.assertThat(new String(messageExt3.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-async-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i5)));
            Assertions.assertThat(messageExt3.getQueueOffset()).isEqualTo(nextBeginOffset);
            nextBeginOffset++;
        }
        PullResult pull3 = consumer.pull(messageQueue, "*", pullResult.getNextBeginOffset(), 10);
        Assertions.assertThat(pull3.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
        Assertions.assertThat(pull3.getMinOffset()).isEqualTo(3L);
        Assertions.assertThat(pull3.getMaxOffset()).isEqualTo(6L);
        Assertions.assertThat(pull3.getNextBeginOffset()).isEqualTo(6L);
        Assertions.assertThat(pull3.getMsgFoundList()).isNull();
    }

    /* JADX WARN: Code restructure failed: missing block: B:100:0x08bc, code lost:
    
        r37 = r0.getNextBeginOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:103:0x06e1, code lost:
    
        r35 = r35 + r0.getMsgFoundList().size();
        r40 = true;
        r0 = r0.getMsgFoundList().iterator();
     */
    /* JADX WARN: Code restructure failed: missing block: B:105:0x0706, code lost:
    
        if (r0.hasNext() == false) goto L137;
     */
    /* JADX WARN: Code restructure failed: missing block: B:106:0x0709, code lost:
    
        r0 = (org.apache.rocketmq.common.message.MessageExt) r0.next();
        org.assertj.core.api.Assertions.assertThat(new java.lang.String(r0.getBody(), java.nio.charset.StandardCharsets.UTF_8)).as("offset=%d", new java.lang.Object[]{java.lang.Long.valueOf(r37)}).isEqualTo(java.lang.String.format(java.util.Locale.ENGLISH, "%s-%d-%d", r0, 1, r0.poll()));
     */
    /* JADX WARN: Code restructure failed: missing block: B:107:0x0763, code lost:
    
        if (r40 == false) goto L40;
     */
    /* JADX WARN: Code restructure failed: missing block: B:108:0x0766, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r0.getQueueOffset()).isGreaterThanOrEqualTo(r37);
        r40 = false;
     */
    /* JADX WARN: Code restructure failed: missing block: B:110:0x0788, code lost:
    
        r37 = r0.getQueueOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:111:0x077a, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r0.getQueueOffset()).isGreaterThan(r37);
     */
    /* JADX WARN: Code restructure failed: missing block: B:114:0x0792, code lost:
    
        r37 = r0.getNextBeginOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:118:0x079c, code lost:
    
        org.junit.Assert.fail(java.lang.String.format(java.util.Locale.ENGLISH, "unexpected pull offset=%d status: %s", java.lang.Long.valueOf(r37), r0));
     */
    /* JADX WARN: Code restructure failed: missing block: B:121:0x06d7, code lost:
    
        r37 = r0.getNextBeginOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x0484, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r35).isGreaterThanOrEqualTo(9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x0540, code lost:
    
        org.awaitility.Awaitility.waitAtMost(5, java.util.concurrent.TimeUnit.SECONDS).until(() -> { // java.util.concurrent.Callable.call():java.lang.Object
            return r1.lambda$test005_MigrateWithDataBackAndForth$7(r2);
        });
        org.awaitility.Awaitility.waitAtMost(5, java.util.concurrent.TimeUnit.SECONDS).until(() -> { // java.util.concurrent.Callable.call():java.lang.Object
            return r1.lambda$test005_MigrateWithDataBackAndForth$8(r2);
        });
        r0 = org.apache.rocketmq.test.smoke.LogicalQueueIT.brokerControllerMap.get(r0.getBrokerName());
        rotateBrokerCommitLog(r0);
        deleteCommitLogFiles(r0, 1);
        r0 = (java.util.List) org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.examineTopicRouteInfo(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic).getLogicalQueuesInfo().get(1);
        org.assertj.core.api.Assertions.assertThat(r0).hasSize(2);
        org.assertj.core.api.Assertions.assertThat((java.lang.Comparable) r0.get(0)).isEqualToIgnoringGivenFields(new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 0, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r28, 1), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.ReadOnly, 0, 3, -1, -1, r30), new java.lang.String[]{"firstMsgTimeMillis", "lastMsgTimeMillis"});
        org.assertj.core.api.Assertions.assertThat((java.lang.Comparable) r0.get(1)).isEqualToComparingFieldByField(new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 6, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r28, 1), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.Normal, 3, -1, -1, -1, r30));
        r35 = 0;
        r0 = new java.util.LinkedList();
        r0.addAll((java.util.Collection) java.util.stream.IntStream.range(0, org.apache.rocketmq.test.smoke.LogicalQueueIT.MSG_SENT_TIMES).boxed().collect(java.util.stream.Collectors.toList()));
        r0.addAll((java.util.Collection) java.util.stream.IntStream.range(6, 9).boxed().collect(java.util.stream.Collectors.toList()));
        r37 = org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.minOffset(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x067e, code lost:
    
        r0 = org.apache.rocketmq.test.smoke.LogicalQueueIT.consumer.pull(r0, "*", r37, 9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x0699, code lost:
    
        switch(org.apache.rocketmq.test.smoke.LogicalQueueIT.AnonymousClass7.$SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[r0.getPullStatus().ordinal()]) {
            case 1: goto L100;
            case 2: goto L103;
            case 3: goto L101;
            default: goto L102;
        };
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x06b4, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r35).as("offset=%d", new java.lang.Object[]{java.lang.Long.valueOf(r37)}).isEqualTo(6);
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x07bc, code lost:
    
        r0 = org.apache.rocketmq.test.smoke.LogicalQueueIT.brokerControllerMap.get(r0.getBrokerName());
        rotateBrokerCommitLog(r0);
        deleteCommitLogFiles(r0, 2);
        org.assertj.core.api.Assertions.assertThat((java.util.List) org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.examineTopicRouteInfo(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic).getLogicalQueuesInfo().get(1)).isEqualTo(java.util.Collections.singletonList(new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 6, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r28, 1), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.Normal, 3, -1, -1, -1, r30)));
        r35 = 0;
        r0 = new java.util.LinkedList();
        r0.addAll((java.util.Collection) java.util.stream.IntStream.range(6, 9).boxed().collect(java.util.stream.Collectors.toList()));
        r37 = org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.minOffset(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x085e, code lost:
    
        r0 = org.apache.rocketmq.test.smoke.LogicalQueueIT.consumer.pull(r0, "*", r37, 9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x0879, code lost:
    
        switch(org.apache.rocketmq.test.smoke.LogicalQueueIT.AnonymousClass7.$SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[r0.getPullStatus().ordinal()]) {
            case 1: goto L108;
            case 2: goto L111;
            case 3: goto L109;
            default: goto L110;
        };
     */
    /* JADX WARN: Code restructure failed: missing block: B:34:0x0897, code lost:
    
        if (r35 == org.apache.rocketmq.test.smoke.LogicalQueueIT.MSG_SENT_TIMES) goto L63;
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x089a, code lost:
    
        org.junit.Assert.fail(java.lang.String.format(java.util.Locale.ENGLISH, "want %d msg but got %d", java.lang.Integer.valueOf(org.apache.rocketmq.test.smoke.LogicalQueueIT.MSG_SENT_TIMES), java.lang.Integer.valueOf(r35)));
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x09a1, code lost:
    
        r0 = org.apache.rocketmq.test.smoke.LogicalQueueIT.brokerControllerMap.get(r28);
        rotateBrokerCommitLog(r0);
        deleteCommitLogFiles(r0, 1);
        r35 = org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.minOffset(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x09c4, code lost:
    
        r0 = org.apache.rocketmq.test.smoke.LogicalQueueIT.consumer.pull(r0, "*", r35, 9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:38:0x09df, code lost:
    
        switch(org.apache.rocketmq.test.smoke.LogicalQueueIT.AnonymousClass7.$SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[r0.getPullStatus().ordinal()]) {
            case 1: goto L116;
            case 2: goto L118;
            case 3: goto L117;
            case 4: goto L116;
            default: goto L117;
        };
     */
    /* JADX WARN: Code restructure failed: missing block: B:40:0x09fc, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r0.getNextBeginOffset()).isEqualTo(9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x0a38, code lost:
    
        org.assertj.core.api.Assertions.assertThat((java.util.List) org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.queryTopicLogicalQueueMapping(r0.getBrokerAddr(), org.apache.rocketmq.test.smoke.LogicalQueueIT.topic).get(1)).isEqualTo(java.util.Collections.singletonList(new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 6, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r28, 1), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.Normal, 3, -1, -1, -1, r30)));
        new org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand().execute(org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt, org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, 1, r29, (java.lang.Long) null);
        org.assertj.core.api.Assertions.assertThat((java.util.List) org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.examineTopicRouteInfo(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic).getLogicalQueuesInfo().get(1)).isEqualTo(java.util.Arrays.asList(new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 6, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r28, 1), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.Expired, 3, 6, 0, 0, r30), new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 9, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r29, org.apache.rocketmq.test.smoke.LogicalQueueIT.QUEUE_NUMBERS), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.Normal, 3, -1, -1, -1, r31)));
        org.assertj.core.api.Assertions.assertThat((java.util.List) org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.queryTopicLogicalQueueMapping(r30, org.apache.rocketmq.test.smoke.LogicalQueueIT.topic).get(1)).isEqualTo(java.util.Arrays.asList(new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 6, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r28, 1), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.Expired, 3, 6, 0, 0, r30), new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 9, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r29, org.apache.rocketmq.test.smoke.LogicalQueueIT.QUEUE_NUMBERS), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.Normal, 3, -1, -1, -1, r31)));
        org.assertj.core.api.Assertions.assertThat((java.util.List) org.apache.rocketmq.test.smoke.LogicalQueueIT.mqAdminExt.queryTopicLogicalQueueMapping(r31, org.apache.rocketmq.test.smoke.LogicalQueueIT.topic).get(1)).isEqualTo(java.util.Collections.singletonList(new org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData(1, 9, new org.apache.rocketmq.common.message.MessageQueue(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, r29, org.apache.rocketmq.test.smoke.LogicalQueueIT.QUEUE_NUMBERS), org.apache.rocketmq.common.protocol.route.MessageQueueRouteState.Normal, 3, -1, -1, -1, r31)));
        r32 = 9;
        r35 = 0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:43:0x0bfb, code lost:
    
        if (r35 >= org.apache.rocketmq.test.smoke.LogicalQueueIT.MSG_SENT_TIMES) goto L122;
     */
    /* JADX WARN: Code restructure failed: missing block: B:44:0x0bfe, code lost:
    
        r9 = r32;
        r32 = r32 + 1;
        r0 = org.apache.rocketmq.test.smoke.LogicalQueueIT.producer.send(new org.apache.rocketmq.common.message.Message(org.apache.rocketmq.test.smoke.LogicalQueueIT.topic, java.lang.String.format(java.util.Locale.ENGLISH, "%s-%d-%d", r0, 1, java.lang.Integer.valueOf(r9)).getBytes(java.nio.charset.StandardCharsets.UTF_8)), r0);
        org.assertj.core.api.Assertions.assertThat(r0.getOrigBrokerName()).isEqualTo(r29);
        org.assertj.core.api.Assertions.assertThat(r0.getOrigQueueId()).isEqualTo(org.apache.rocketmq.test.smoke.LogicalQueueIT.QUEUE_NUMBERS);
        r35 = r35 + 1;
     */
    /* JADX WARN: Code restructure failed: missing block: B:46:0x0c62, code lost:
    
        r35 = 0;
        r0 = new java.util.LinkedList();
        r0.addAll((java.util.Collection) java.util.stream.IntStream.range(9, 12).boxed().collect(java.util.stream.Collectors.toList()));
        r37 = 0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:47:0x0c90, code lost:
    
        r0 = org.apache.rocketmq.test.smoke.LogicalQueueIT.consumer.pull(r0, "*", r37, 9);
     */
    /* JADX WARN: Code restructure failed: missing block: B:48:0x0cab, code lost:
    
        switch(org.apache.rocketmq.test.smoke.LogicalQueueIT.AnonymousClass7.$SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[r0.getPullStatus().ordinal()]) {
            case 1: goto L123;
            case 2: goto L126;
            case 3: goto L124;
            default: goto L125;
        };
     */
    /* JADX WARN: Code restructure failed: missing block: B:50:0x0cc4, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r35).as("msgCount with offset=%d", new java.lang.Object[]{java.lang.Long.valueOf(r37)}).isEqualTo(org.apache.rocketmq.test.smoke.LogicalQueueIT.MSG_SENT_TIMES);
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x0dda, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:54:0x0cff, code lost:
    
        r35 = r35 + r0.getMsgFoundList().size();
        r40 = true;
        r0 = r0.getMsgFoundList().iterator();
     */
    /* JADX WARN: Code restructure failed: missing block: B:56:0x0d24, code lost:
    
        if (r0.hasNext() == false) goto L131;
     */
    /* JADX WARN: Code restructure failed: missing block: B:57:0x0d27, code lost:
    
        r0 = (org.apache.rocketmq.common.message.MessageExt) r0.next();
        org.assertj.core.api.Assertions.assertThat(new java.lang.String(r0.getBody(), java.nio.charset.StandardCharsets.UTF_8)).as("offset=%d", new java.lang.Object[]{java.lang.Long.valueOf(r37)}).isEqualTo(java.lang.String.format(java.util.Locale.ENGLISH, "%s-%d-%d", r0, 1, r0.poll()));
     */
    /* JADX WARN: Code restructure failed: missing block: B:58:0x0d81, code lost:
    
        if (r40 == false) goto L85;
     */
    /* JADX WARN: Code restructure failed: missing block: B:59:0x0d84, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r0.getQueueOffset()).isGreaterThanOrEqualTo(r37);
        r40 = false;
     */
    /* JADX WARN: Code restructure failed: missing block: B:61:0x0da6, code lost:
    
        r37 = r0.getQueueOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:62:0x0d98, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r0.getQueueOffset()).isGreaterThan(r37);
     */
    /* JADX WARN: Code restructure failed: missing block: B:65:0x0db0, code lost:
    
        r37 = r0.getNextBeginOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:69:0x0dba, code lost:
    
        org.junit.Assert.fail(java.lang.String.format(java.util.Locale.ENGLISH, "unexpected pull offset=%d status: %s", java.lang.Long.valueOf(r37), r0));
     */
    /* JADX WARN: Code restructure failed: missing block: B:72:0x0ce6, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r0.getNextBeginOffset()).isNotEqualTo(Long.MIN_VALUE);
        r37 = r0.getNextBeginOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:75:0x0a18, code lost:
    
        org.junit.Assert.fail(java.lang.String.format(java.util.Locale.ENGLISH, "unexpected pull offset=%d status: %s", java.lang.Long.valueOf(r35), r0));
     */
    /* JADX WARN: Code restructure failed: missing block: B:79:0x0a0e, code lost:
    
        r35 = r0.getNextBeginOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:82:0x08c6, code lost:
    
        r35 = r35 + r0.getMsgFoundList().size();
        r40 = true;
        r0 = r0.getMsgFoundList().iterator();
     */
    /* JADX WARN: Code restructure failed: missing block: B:84:0x08eb, code lost:
    
        if (r0.hasNext() == false) goto L134;
     */
    /* JADX WARN: Code restructure failed: missing block: B:85:0x08ee, code lost:
    
        r0 = (org.apache.rocketmq.common.message.MessageExt) r0.next();
        org.assertj.core.api.Assertions.assertThat(new java.lang.String(r0.getBody(), java.nio.charset.StandardCharsets.UTF_8)).as("offset=%d", new java.lang.Object[]{java.lang.Long.valueOf(r37)}).isEqualTo(java.lang.String.format(java.util.Locale.ENGLISH, "%s-%d-%d", r0, 1, r0.poll()));
     */
    /* JADX WARN: Code restructure failed: missing block: B:86:0x0948, code lost:
    
        if (r40 == false) goto L58;
     */
    /* JADX WARN: Code restructure failed: missing block: B:87:0x094b, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r0.getQueueOffset()).isGreaterThanOrEqualTo(r37);
        r40 = false;
     */
    /* JADX WARN: Code restructure failed: missing block: B:89:0x096d, code lost:
    
        r37 = r0.getQueueOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:90:0x095f, code lost:
    
        org.assertj.core.api.Assertions.assertThat(r0.getQueueOffset()).isGreaterThan(r37);
     */
    /* JADX WARN: Code restructure failed: missing block: B:93:0x0977, code lost:
    
        r37 = r0.getNextBeginOffset();
     */
    /* JADX WARN: Code restructure failed: missing block: B:97:0x0981, code lost:
    
        org.junit.Assert.fail(java.lang.String.format(java.util.Locale.ENGLISH, "unexpected pull offset=%d status: %s", java.lang.Long.valueOf(r37), r0));
     */
    @org.junit.Test
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void test005_MigrateWithDataBackAndForth() throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 3547
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.test.smoke.LogicalQueueIT.test005_MigrateWithDataBackAndForth():void");
    }

    @Test
    public void test006_LogicalQueueNumChanged() throws Exception {
        String currentMethodName = getCurrentMethodName();
        int i = brokerNum * QUEUE_NUMBERS;
        Assertions.assertThat(producer.fetchPublishMessageQueues(topic)).hasSize(i);
        Assertions.assertThat((List) consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList())).hasSize(i);
        int i2 = i + 1;
        new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, i2);
        int i3 = i2 - 1;
        MessageQueue messageQueue = new MessageQueue(topic, "__logical_queue_broker__", i3);
        LogicalQueuesInfo logicalQueuesInfo = mqAdminExt.examineTopicRouteInfo(topic).getLogicalQueuesInfo();
        Assertions.assertThat(logicalQueuesInfo).isNotNull();
        List list = (List) logicalQueuesInfo.get(Integer.valueOf(i3));
        Assertions.assertThat(list).isNotNull();
        Assertions.assertThat(list).hasSize(1);
        LogicalQueueRouteData logicalQueueRouteData = (LogicalQueueRouteData) list.get(0);
        String brokerName = logicalQueueRouteData.getBrokerName();
        Assertions.assertThat(logicalQueueRouteData.getState()).isEqualTo(MessageQueueRouteState.Normal);
        Assertions.assertThat(logicalQueueRouteData.getLogicalQueueDelta()).isEqualTo(0L);
        Assertions.assertThat(logicalQueueRouteData.getLogicalQueueIndex()).isEqualTo(i3);
        List<MessageQueue> fetchPublishMessageQueues = producer.fetchPublishMessageQueues(topic);
        Assertions.assertThat(fetchPublishMessageQueues).hasSize(i2);
        Set set = (Set) IntStream.range(0, i2).boxed().collect(Collectors.toSet());
        HashMap newHashMap = Maps.newHashMap();
        Iterator it = Arrays.asList(broker1Name, broker2Name).iterator();
        while (it.hasNext()) {
            newHashMap.put((String) it.next(), IntStream.range(0, QUEUE_NUMBERS).boxed().collect(Collectors.toSet()));
        }
        ((Set) newHashMap.get(brokerName)).add(Integer.valueOf(QUEUE_NUMBERS));
        for (MessageQueue messageQueue2 : fetchPublishMessageQueues) {
            Assertions.assertThat(messageQueue2.getBrokerName()).isEqualTo("__logical_queue_broker__");
            Assertions.assertThat(set.remove(Integer.valueOf(messageQueue2.getQueueId()))).isTrue();
            for (int i4 = 0; i4 < MSG_SENT_TIMES; i4++) {
                SendResultForLogicalQueue send = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", currentMethodName, Integer.valueOf(messageQueue2.getQueueId()), Integer.valueOf(i4)).getBytes(StandardCharsets.UTF_8)), messageQueue2);
                Assertions.assertThat(send.getMessageQueue().getBrokerName()).isEqualTo(messageQueue2.getBrokerName());
                Assertions.assertThat(send.getMessageQueue().getQueueId()).isEqualTo(messageQueue2.getQueueId());
                if (i4 == 0) {
                    SendResultForLogicalQueue sendResultForLogicalQueue = send;
                    Assertions.assertThat(((Set) newHashMap.get(sendResultForLogicalQueue.getOrigBrokerName())).remove(Integer.valueOf(sendResultForLogicalQueue.getOrigQueueId()))).as("brokerName %s queueId %d", new Object[]{sendResultForLogicalQueue.getOrigBrokerName(), Integer.valueOf(sendResultForLogicalQueue.getOrigQueueId())}).isTrue();
                }
            }
        }
        Assertions.assertThat(set).isEmpty();
        List<MessageQueue> list2 = (List) consumer.fetchSubscribeMessageQueues(topic).stream().sorted().collect(Collectors.toList());
        Assertions.assertThat(list2).hasSize(i2);
        list2.sort(Comparator.comparingInt((v0) -> {
            return v0.getQueueId();
        }));
        set.addAll((Collection) IntStream.range(0, i2).boxed().collect(Collectors.toSet()));
        for (MessageQueue messageQueue3 : list2) {
            Assertions.assertThat(messageQueue3.getBrokerName()).isEqualTo("__logical_queue_broker__");
            Assertions.assertThat(set.remove(Integer.valueOf(messageQueue3.getQueueId()))).isTrue();
            long minOffset = mqAdminExt.minOffset(messageQueue3);
            Assertions.assertThat(minOffset).isEqualTo(0L);
            PullResult pull = consumer.pull(messageQueue3, "*", minOffset, 10);
            Assertions.assertThat(pull.getPullStatus()).isEqualTo(PullStatus.FOUND);
            Assertions.assertThat(pull.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
            for (int i5 = 0; i5 < MSG_SENT_TIMES; i5++) {
                MessageExt messageExt = (MessageExt) pull.getMsgFoundList().get(i5);
                Assertions.assertThat(messageExt.getBrokerName()).isEqualTo("__logical_queue_broker__");
                Assertions.assertThat(messageExt.getQueueId()).isEqualTo(messageQueue3.getQueueId());
                Assertions.assertThat(new String(messageExt.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", currentMethodName, Integer.valueOf(messageQueue3.getQueueId()), Integer.valueOf(i5)));
                Assertions.assertThat(messageExt.getQueueOffset()).isEqualTo(minOffset + i5);
            }
            Assertions.assertThat(maxOffsetUncommitted(messageQueue3)).isEqualTo(minOffset + 3);
        }
        Assertions.assertThat(set).isEmpty();
        String brokerAddr = brokerController2.getBrokerAddr();
        TopicConfig examineTopicConfig = mqAdminExt.examineTopicConfig(brokerAddr, topic);
        examineTopicConfig.setWriteQueueNums(examineTopicConfig.getWriteQueueNums() + 1);
        examineTopicConfig.setReadQueueNums(examineTopicConfig.getReadQueueNums() + 1);
        mqAdminExt.createAndUpdateTopicConfig(brokerAddr, examineTopicConfig);
        int i6 = i2 + 1;
        new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, i6);
        int i7 = i6 - 1;
        LogicalQueuesInfo logicalQueuesInfo2 = mqAdminExt.examineTopicRouteInfo(topic).getLogicalQueuesInfo();
        Assertions.assertThat(logicalQueuesInfo2).isNotNull();
        List list3 = (List) logicalQueuesInfo2.get(Integer.valueOf(i7));
        Assertions.assertThat(list3).isNotNull();
        Assertions.assertThat(list3).hasSize(1);
        LogicalQueueRouteData logicalQueueRouteData2 = (LogicalQueueRouteData) list3.get(0);
        Assertions.assertThat(logicalQueueRouteData2.getState()).isEqualTo(MessageQueueRouteState.Normal);
        Assertions.assertThat(logicalQueueRouteData2.getLogicalQueueDelta()).isEqualTo(0L);
        Assertions.assertThat(logicalQueueRouteData2.getLogicalQueueIndex()).isEqualTo(i7);
        Assertions.assertThat(logicalQueueRouteData2.getBrokerName()).isEqualTo(broker2Name);
        Assertions.assertThat(logicalQueueRouteData2.getQueueId()).isEqualTo(examineTopicConfig.getWriteQueueNums() - 1);
        new UpdateTopicLogicalQueueNumCommand().execute(mqAdminExt, clusterName, topic, i6 - 2);
        try {
            producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), messageQueue);
            Assert.fail("write to decreased logical queue success, want it failed");
        } catch (MQBrokerException e) {
            Assertions.assertThat(e.getResponseCode()).isEqualTo(16);
        }
        PullResult pull2 = consumer.pull(messageQueue, "*", 0, 10);
        Assertions.assertThat(pull2.getPullStatus()).isEqualTo(PullStatus.FOUND);
        Assertions.assertThat(pull2.getMsgFoundList()).hasSize(MSG_SENT_TIMES);
        for (int i8 = 0; i8 < MSG_SENT_TIMES; i8++) {
            MessageExt messageExt2 = (MessageExt) pull2.getMsgFoundList().get(i8);
            Assertions.assertThat(messageExt2.getBrokerName()).isEqualTo("__logical_queue_broker__");
            Assertions.assertThat(messageExt2.getQueueId()).isEqualTo(messageQueue.getQueueId());
            Assertions.assertThat(new String(messageExt2.getBody(), StandardCharsets.UTF_8)).isEqualTo(String.format(Locale.ENGLISH, "%s-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i8)));
            Assertions.assertThat(messageExt2.getQueueOffset()).isEqualTo(0 + i8);
        }
        BrokerController brokerController = brokerControllerMap.get(brokerName);
        rotateBrokerCommitLog(brokerController);
        deleteCommitLogFiles(brokerController, 1);
        Assertions.assertThat(consumer.pull(messageQueue, "*", 0, 10).getPullStatus()).isIn(new Object[]{PullStatus.NO_NEW_MSG, PullStatus.NO_MATCHED_MSG});
    }

    @Test
    public void test007_LogicalQueueWritableEvenBrokerDown() throws Exception {
        String currentMethodName = getCurrentMethodName();
        BrokerController createAndStartBroker = IntegrationTestBase.createAndStartBroker(nsAddr);
        String brokerName = createAndStartBroker.getBrokerConfig().getBrokerName();
        brokerControllerMap.put(brokerName, createAndStartBroker);
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(mqAdminExt.examineBrokerClusterInfo().getBrokerAddrTable().containsKey(brokerName));
        });
        mqAdminExt.createAndUpdateTopicConfig(createAndStartBroker.getBrokerAddr(), new TopicConfig(topic, 0, 0, 6));
        new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, 1, createAndStartBroker.getBrokerConfig().getBrokerName(), (Long) null);
        MessageQueue messageQueue = new MessageQueue(topic, "__logical_queue_broker__", 1);
        for (int i = 0; i < MSG_SENT_TIMES; i++) {
            SendResultForLogicalQueue send = producer.send(new Message(topic, String.format(Locale.ENGLISH, "%s-%d-%d", currentMethodName, Integer.valueOf(messageQueue.getQueueId()), Integer.valueOf(i)).getBytes(StandardCharsets.UTF_8)), messageQueue);
            Assertions.assertThat(send.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
            Assertions.assertThat(send.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
            SendResultForLogicalQueue sendResultForLogicalQueue = send;
            Assertions.assertThat(sendResultForLogicalQueue.getOrigBrokerName()).isEqualTo(brokerName);
            Assertions.assertThat(sendResultForLogicalQueue.getOrigQueueId()).isEqualTo(0);
        }
        createAndStartBroker.shutdown();
        brokerControllerMap.remove(brokerName);
        Assertions.assertThatThrownBy(() -> {
            logger.error("send should fail but got {}", producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), messageQueue));
        }).isInstanceOf(RemotingException.class).hasMessageMatching("connect to [0-9.:]+ failed");
        Assertions.assertThatThrownBy(() -> {
            new MigrateTopicLogicalQueueCommand().execute(mqAdminExt, topic, 1, broker1Name, (Long) null);
        }).hasRootCauseInstanceOf(RemotingConnectException.class).hasMessageContaining("migrateTopicLogicalQueuePrepare");
        SendResultForLogicalQueue send2 = producer.send(new Message(topic, "aaa".getBytes(StandardCharsets.UTF_8)), messageQueue);
        Assertions.assertThat(send2.getMessageQueue().getBrokerName()).isEqualTo(messageQueue.getBrokerName());
        Assertions.assertThat(send2.getMessageQueue().getQueueId()).isEqualTo(messageQueue.getQueueId());
        Assertions.assertThat(send2.getQueueOffset()).isEqualTo(-1L);
        SendResultForLogicalQueue sendResultForLogicalQueue2 = send2;
        Assertions.assertThat(sendResultForLogicalQueue2.getOrigBrokerName()).isEqualTo(broker1Name);
        Assertions.assertThat(sendResultForLogicalQueue2.getOrigQueueId()).isIn(new Object[]{Integer.valueOf(QUEUE_NUMBERS), 1});
    }

    private static String getBrokerCommitLogFileName(BrokerController brokerController) throws IllegalAccessException {
        return ((MappedFileQueue) FieldUtils.readDeclaredField(brokerController.getMessageStore().getCommitLog(), "mappedFileQueue", true)).getLastMappedFile().getFileName();
    }

    private static void deleteCommitLogFiles(BrokerController brokerController, int i) throws IllegalAccessException {
        CommitLog commitLog = brokerController.getMessageStore().getCommitLog();
        commitLog.flush();
        MappedFileQueue mappedFileQueue = (MappedFileQueue) FieldUtils.readDeclaredField(commitLog, "mappedFileQueue", true);
        AtomicInteger atomicInteger = new AtomicInteger();
        Awaitility.waitAtMost(5L, TimeUnit.SECONDS).until(() -> {
            atomicInteger.getAndAdd(commitLog.deleteExpiredFile(0L, 0, 5000L, true, 1));
            return Boolean.valueOf(mappedFileQueue.getMappedFiles().size() <= i);
        });
        brokerController.getTopicConfigManager().getLogicalQueueCleanHook().execute(brokerController.getMessageStore(), atomicInteger.get());
        logger.info("deleteCommitLogFiles {} count {}", brokerController.getBrokerConfig().getBrokerName(), Integer.valueOf(atomicInteger.get()));
    }

    private static void rotateBrokerCommitLog(BrokerController brokerController) throws IllegalAccessException {
        CommitLog commitLog = brokerController.getMessageStore().getCommitLog();
        commitLog.flush();
        String brokerName = brokerController.getBrokerConfig().getBrokerName();
        String brokerCommitLogFileName = getBrokerCommitLogFileName(brokerController);
        logger.info("rotateBrokerCommitLog {} first {}", brokerName, brokerCommitLogFileName);
        Message message = new Message(placeholderTopic, RandomStringUtils.randomAscii(4096).getBytes(StandardCharsets.UTF_8));
        MessageQueue messageQueue = new MessageQueue(placeholderTopic, brokerName, 0);
        Awaitility.waitAtMost(5L, TimeUnit.SECONDS).until(() -> {
            for (int i = 0; i < 128; i++) {
                producer.send(message, messageQueue);
            }
            commitLog.flush();
            String brokerCommitLogFileName2 = getBrokerCommitLogFileName(brokerController);
            if (brokerCommitLogFileName.equals(brokerCommitLogFileName2)) {
                return false;
            }
            logger.info("rotateBrokerCommitLog {} 4K msg last {}", brokerName, brokerCommitLogFileName2);
            return true;
        });
    }

    private long maxOffsetUncommitted(MessageQueue messageQueue) throws IllegalAccessException, MQClientException {
        return ((DefaultMQAdminExtImpl) FieldUtils.readDeclaredField(mqAdminExt, "defaultMQAdminExtImpl", true)).maxOffset(messageQueue, false);
    }
}
