package org.apache.rocketmq.dashboard.service.impl;

import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.dashboard.config.RMQConfigure;
import org.apache.rocketmq.dashboard.exception.ServiceException;
import org.apache.rocketmq.dashboard.model.MessagePage;
import org.apache.rocketmq.dashboard.model.MessagePageTask;
import org.apache.rocketmq.dashboard.model.MessageQueryByPage;
import org.apache.rocketmq.dashboard.model.MessageView;
import org.apache.rocketmq.dashboard.model.QueueOffsetInfo;
import org.apache.rocketmq.dashboard.model.request.MessageQuery;
import org.apache.rocketmq.dashboard.service.MessageService;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.class */
public class MessageServiceImpl implements MessageService {
    private Logger logger = LoggerFactory.getLogger((Class<?>) MessageServiceImpl.class);
    private static final Cache<String, List<QueueOffsetInfo>> CACHE = CacheBuilder.newBuilder().maximumSize(10000).expireAfterWrite(60, TimeUnit.MINUTES).build();

    @Autowired
    private RMQConfigure configure;
    private static final int QUERY_MESSAGE_MAX_NUM = 64;

    @Resource
    private MQAdminExt mqAdminExt;

    @Override // org.apache.rocketmq.dashboard.service.MessageService
    public Pair<MessageView, List<MessageTrack>> viewMessage(String str, String str2) {
        try {
            MessageExt viewMessage = this.mqAdminExt.viewMessage(str, str2);
            return new Pair<>(MessageView.fromMessageExt(viewMessage), messageTrackDetail(viewMessage));
        } catch (Exception e) {
            throw new ServiceException(-1, String.format("Failed to query message by Id: %s", str2));
        }
    }

    @Override // org.apache.rocketmq.dashboard.service.MessageService
    public List<MessageView> queryMessageByTopicAndKey(String str, String str2) {
        try {
            return Lists.transform(this.mqAdminExt.queryMessage(str, str2, 64, 0L, System.currentTimeMillis()).getMessageList(), new Function<MessageExt, MessageView>() { // from class: org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl.1
                @Override // com.google.common.base.Function, java.util.function.Function
                public MessageView apply(MessageExt messageExt) {
                    return MessageView.fromMessageExt(messageExt);
                }
            });
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:27:0x013b, code lost:
    
        continue;
     */
    @Override // org.apache.rocketmq.dashboard.service.MessageService
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.util.List<org.apache.rocketmq.dashboard.model.MessageView> queryMessageByTopic(java.lang.String r10, final long r11, final long r13) {
        /*
            Method dump skipped, instructions count: 361
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.dashboard.service.impl.MessageServiceImpl.queryMessageByTopic(java.lang.String, long, long):java.util.List");
    }

    @Override // org.apache.rocketmq.dashboard.service.MessageService
    public List<MessageTrack> messageTrackDetail(MessageExt messageExt) {
        try {
            return this.mqAdminExt.messageTrackDetail(messageExt);
        } catch (Exception e) {
            this.logger.error("op=messageTrackDetailError", (Throwable) e);
            return Collections.emptyList();
        }
    }

    @Override // org.apache.rocketmq.dashboard.service.MessageService
    public ConsumeMessageDirectlyResult consumeMessageDirectly(String str, String str2, String str3, String str4) {
        if (StringUtils.isNotBlank(str4)) {
            try {
                return this.mqAdminExt.consumeMessageDirectly(str3, str4, str, str2);
            } catch (Exception e) {
                throw Throwables.propagate(e);
            }
        }
        try {
            Iterator<Connection> it = this.mqAdminExt.examineConsumerConnectionInfo(str3).getConnectionSet().iterator();
            while (it.hasNext()) {
                Connection next = it.next();
                if (!StringUtils.isBlank(next.getClientId())) {
                    this.logger.info("clientId={}", next.getClientId());
                    return this.mqAdminExt.consumeMessageDirectly(str3, next.getClientId(), str, str2);
                }
            }
            throw new IllegalStateException("NO CONSUMER");
        } catch (Exception e2) {
            throw Throwables.propagate(e2);
        }
    }

    @Override // org.apache.rocketmq.dashboard.service.MessageService
    public MessagePage queryMessageByPage(MessageQuery messageQuery) {
        MessageQueryByPage messageQueryByPage = new MessageQueryByPage(messageQuery.getPageNum(), messageQuery.getPageSize(), messageQuery.getTopic(), messageQuery.getBegin(), messageQuery.getEnd());
        List<QueueOffsetInfo> ifPresent = CACHE.getIfPresent(messageQuery.getTaskId());
        if (ifPresent != null) {
            return new MessagePage(queryMessageByTaskPage(messageQueryByPage, ifPresent), messageQuery.getTaskId());
        }
        messageQuery.setPageNum(1);
        MessagePageTask queryFirstMessagePage = queryFirstMessagePage(messageQueryByPage);
        String createUniqID = MessageClientIDSetter.createUniqID();
        CACHE.put(createUniqID, queryFirstMessagePage.getQueueOffsetInfos());
        return new MessagePage(queryFirstMessagePage.getPage(), createUniqID);
    }

    private MessagePageTask queryFirstMessagePage(MessageQueryByPage messageQueryByPage) {
        DefaultMQPullConsumer buildDefaultMQPullConsumer = buildDefaultMQPullConsumer((StringUtils.isEmpty(this.configure.getAccessKey()) || StringUtils.isEmpty(this.configure.getSecretKey())) ? false : true ? new AclClientRPCHook(new SessionCredentials(this.configure.getAccessKey(), this.configure.getSecretKey())) : null, this.configure.isUseTLS());
        long j = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        try {
            try {
                buildDefaultMQPullConsumer.start();
                int i = 0;
                for (MessageQueue messageQueue : buildDefaultMQPullConsumer.fetchSubscribeMessageQueues(messageQueryByPage.getTopic())) {
                    Long valueOf = Long.valueOf(buildDefaultMQPullConsumer.searchOffset(messageQueue, messageQueryByPage.getBegin()));
                    int i2 = i;
                    i++;
                    arrayList.add(new QueueOffsetInfo(Integer.valueOf(i2), valueOf, Long.valueOf(buildDefaultMQPullConsumer.searchOffset(messageQueue, messageQueryByPage.getEnd()) + 1), valueOf, valueOf, messageQueue));
                }
                for (QueueOffsetInfo queueOffsetInfo : arrayList) {
                    Long start = queueOffsetInfo.getStart();
                    boolean z = false;
                    boolean z2 = true;
                    while (z2) {
                        PullResult pull = buildDefaultMQPullConsumer.pull(queueOffsetInfo.getMessageQueues(), "*", start.longValue(), 32);
                        if (pull.getPullStatus() == PullStatus.FOUND) {
                            z = true;
                            Iterator<MessageExt> it = pull.getMsgFoundList().iterator();
                            while (true) {
                                if (!it.hasNext()) {
                                    break;
                                }
                                if (it.next().getStoreTimestamp() >= messageQueryByPage.getBegin()) {
                                    z2 = false;
                                    break;
                                }
                                start = Long.valueOf(start.longValue() + 1);
                            }
                        } else {
                            z2 = false;
                        }
                    }
                    if (!z) {
                        queueOffsetInfo.setEnd(queueOffsetInfo.getStart());
                    }
                    queueOffsetInfo.setStart(start);
                    queueOffsetInfo.setStartOffset(start);
                    queueOffsetInfo.setEndOffset(start);
                }
                for (QueueOffsetInfo queueOffsetInfo2 : arrayList) {
                    if (!queueOffsetInfo2.getStart().equals(queueOffsetInfo2.getEnd())) {
                        long longValue = queueOffsetInfo2.getEnd().longValue();
                        long j2 = longValue;
                        int i3 = 32;
                        boolean z3 = true;
                        while (z3) {
                            if (j2 - i3 > queueOffsetInfo2.getStart().longValue()) {
                                j2 -= i3;
                            } else {
                                j2 = queueOffsetInfo2.getStartOffset().longValue();
                                i3 = (int) (longValue - j2);
                            }
                            PullResult pull2 = buildDefaultMQPullConsumer.pull(queueOffsetInfo2.getMessageQueues(), "*", j2, i3);
                            if (pull2.getPullStatus() == PullStatus.FOUND) {
                                List<MessageExt> msgFoundList = pull2.getMsgFoundList();
                                int size = msgFoundList.size() - 1;
                                while (true) {
                                    if (size < 0) {
                                        break;
                                    }
                                    if (msgFoundList.get(size).getStoreTimestamp() >= messageQueryByPage.getBegin()) {
                                        z3 = false;
                                        break;
                                    }
                                    longValue--;
                                    size--;
                                }
                            } else {
                                z3 = false;
                            }
                            if (j2 == queueOffsetInfo2.getStartOffset().longValue()) {
                                break;
                            }
                        }
                        queueOffsetInfo2.setEnd(Long.valueOf(longValue));
                        j += queueOffsetInfo2.getEnd().longValue() - queueOffsetInfo2.getStart().longValue();
                    }
                }
                long pageSize = j > ((long) messageQueryByPage.getPageSize()) ? messageQueryByPage.getPageSize() : j;
                moveEndOffset(arrayList, messageQueryByPage, moveStartOffset(arrayList, messageQueryByPage));
                for (QueueOffsetInfo queueOffsetInfo3 : arrayList) {
                    Long startOffset = queueOffsetInfo3.getStartOffset();
                    long min = Math.min(queueOffsetInfo3.getEndOffset().longValue() - startOffset.longValue(), pageSize);
                    if (min != 0) {
                        while (min > 0) {
                            PullResult pull3 = buildDefaultMQPullConsumer.pull(queueOffsetInfo3.getMessageQueues(), "*", startOffset.longValue(), 32);
                            if (pull3.getPullStatus() != PullStatus.FOUND) {
                                break;
                            }
                            List<MessageExt> msgFoundList2 = pull3.getMsgFoundList();
                            if (msgFoundList2.size() == 0) {
                                break;
                            }
                            for (MessageView messageView : (List) msgFoundList2.stream().map(MessageView::fromMessageExt).collect(Collectors.toList())) {
                                if (min > 0) {
                                    arrayList2.add(messageView);
                                    min--;
                                }
                            }
                        }
                    }
                }
                MessagePageTask messagePageTask = new MessagePageTask(new PageImpl(arrayList2, messageQueryByPage.page(), j), arrayList);
                buildDefaultMQPullConsumer.shutdown();
                return messagePageTask;
            } catch (Exception e) {
                throw Throwables.propagate(e);
            }
        } catch (Throwable th) {
            buildDefaultMQPullConsumer.shutdown();
            throw th;
        }
    }

    private Page<MessageView> queryMessageByTaskPage(MessageQueryByPage messageQueryByPage, List<QueueOffsetInfo> list) {
        DefaultMQPullConsumer buildDefaultMQPullConsumer = buildDefaultMQPullConsumer((StringUtils.isEmpty(this.configure.getAccessKey()) || StringUtils.isEmpty(this.configure.getSecretKey())) ? false : true ? new AclClientRPCHook(new SessionCredentials(this.configure.getAccessKey(), this.configure.getSecretKey())) : null, this.configure.isUseTLS());
        ArrayList arrayList = new ArrayList();
        long pageNum = messageQueryByPage.getPageNum() * messageQueryByPage.getPageSize();
        long j = 0;
        try {
            try {
                buildDefaultMQPullConsumer.start();
                for (QueueOffsetInfo queueOffsetInfo : list) {
                    long longValue = queueOffsetInfo.getStart().longValue();
                    long longValue2 = queueOffsetInfo.getEnd().longValue();
                    queueOffsetInfo.setStartOffset(Long.valueOf(longValue));
                    queueOffsetInfo.setEndOffset(Long.valueOf(longValue));
                    j += longValue2 - longValue;
                }
                if (j <= pageNum) {
                    Page<MessageView> empty = Page.empty();
                    buildDefaultMQPullConsumer.shutdown();
                    return empty;
                }
                long pageSize = j - pageNum > ((long) messageQueryByPage.getPageSize()) ? messageQueryByPage.getPageSize() : j - pageNum;
                moveEndOffset(list, messageQueryByPage, moveStartOffset(list, messageQueryByPage));
                for (QueueOffsetInfo queueOffsetInfo2 : list) {
                    Long startOffset = queueOffsetInfo2.getStartOffset();
                    long min = Math.min(queueOffsetInfo2.getEndOffset().longValue() - startOffset.longValue(), pageSize);
                    if (min != 0) {
                        while (min > 0) {
                            PullResult pull = buildDefaultMQPullConsumer.pull(queueOffsetInfo2.getMessageQueues(), "*", startOffset.longValue(), 32);
                            if (pull.getPullStatus() != PullStatus.FOUND) {
                                break;
                            }
                            List<MessageExt> msgFoundList = pull.getMsgFoundList();
                            if (msgFoundList.size() == 0) {
                                break;
                            }
                            for (MessageView messageView : (List) msgFoundList.stream().map(MessageView::fromMessageExt).collect(Collectors.toList())) {
                                if (min > 0) {
                                    arrayList.add(messageView);
                                    min--;
                                }
                            }
                        }
                    }
                }
                PageImpl pageImpl = new PageImpl(arrayList, messageQueryByPage.page(), j);
                buildDefaultMQPullConsumer.shutdown();
                return pageImpl;
            } catch (Exception e) {
                throw Throwables.propagate(e);
            }
        } catch (Throwable th) {
            buildDefaultMQPullConsumer.shutdown();
            throw th;
        }
    }

    private int moveStartOffset(List<QueueOffsetInfo> list, MessageQueryByPage messageQueryByPage) {
        int size = list.size();
        int i = 0;
        long pageNum = messageQueryByPage.getPageNum() * messageQueryByPage.getPageSize();
        if (pageNum == 0) {
            return 0;
        }
        List<QueueOffsetInfo> list2 = (List) list.stream().sorted((queueOffsetInfo, queueOffsetInfo2) -> {
            long longValue = queueOffsetInfo.getEnd().longValue() - queueOffsetInfo.getStart().longValue();
            long longValue2 = queueOffsetInfo2.getEnd().longValue() - queueOffsetInfo2.getStart().longValue();
            if (longValue < longValue2) {
                return -1;
            }
            return longValue > longValue2 ? 1 : 0;
        }).collect(Collectors.toList());
        for (int i2 = 0; i2 < size && pageNum >= size - i2; i2++) {
            long longValue = ((QueueOffsetInfo) list2.get(i2)).getEnd().longValue() - ((QueueOffsetInfo) list2.get(i2)).getStartOffset().longValue();
            if (longValue != 0) {
                long j = longValue * (size - i2);
                if (j <= pageNum) {
                    pageNum -= j;
                    for (int i3 = i2; i3 < size; i3++) {
                        ((QueueOffsetInfo) list2.get(i3)).incStartOffset(longValue);
                    }
                } else {
                    long j2 = pageNum / (size - i2);
                    pageNum -= j2 * (size - i2);
                    if (j2 != 0) {
                        for (int i4 = i2; i4 < size; i4++) {
                            ((QueueOffsetInfo) list2.get(i4)).incStartOffset(j2);
                        }
                    }
                }
            }
        }
        for (QueueOffsetInfo queueOffsetInfo3 : list2) {
            QueueOffsetInfo queueOffsetInfo4 = list.get(queueOffsetInfo3.getIdx().intValue());
            queueOffsetInfo4.setStartOffset(queueOffsetInfo3.getStartOffset());
            queueOffsetInfo4.setEndOffset(queueOffsetInfo3.getEndOffset());
        }
        for (QueueOffsetInfo queueOffsetInfo5 : list) {
            if (pageNum == 0) {
                break;
            }
            i = (i + 1) % size;
            if (queueOffsetInfo5.getStartOffset().longValue() < queueOffsetInfo5.getEnd().longValue()) {
                queueOffsetInfo5.incStartOffset();
                pageNum--;
            }
        }
        return i;
    }

    private void moveEndOffset(List<QueueOffsetInfo> list, MessageQueryByPage messageQueryByPage, int i) {
        int size = list.size();
        for (int i2 = 0; i2 < messageQueryByPage.getPageSize(); i2++) {
            QueueOffsetInfo queueOffsetInfo = list.get(i);
            i = (i + 1) % size;
            while (queueOffsetInfo.getEndOffset().longValue() >= queueOffsetInfo.getEnd().longValue()) {
                queueOffsetInfo = list.get(i);
                i = (i + 1) % size;
                if (i == i) {
                    return;
                }
            }
            queueOffsetInfo.incEndOffset();
        }
    }

    public DefaultMQPullConsumer buildDefaultMQPullConsumer(RPCHook rPCHook, boolean z) {
        DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rPCHook);
        defaultMQPullConsumer.setUseTLS(z);
        return defaultMQPullConsumer;
    }
}
