package org.apache.rocketmq.client.impl.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;

/* loaded from: input_file:org/apache/rocketmq/client/impl/consumer/RebalanceImpl.class */
public abstract class RebalanceImpl {
    protected static final InternalLogger log = ClientLogger.getLog();
    protected String consumerGroup;
    protected MessageModel messageModel;
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    protected MQClientInstance mQClientFactory;
    private static final int TIMEOUT_CHECK_TIMES = 3;
    private static final int QUERY_ASSIGNMENT_TIMEOUT = 3000;
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap(64);
    protected final ConcurrentMap<MessageQueue, PopProcessQueue> popProcessQueueTable = new ConcurrentHashMap(64);
    protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap();
    protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap();
    private Map<String, String> topicBrokerRebalance = new ConcurrentHashMap();
    private Map<String, String> topicClientRebalance = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.rocketmq.client.impl.consumer.RebalanceImpl$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/client/impl/consumer/RebalanceImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel = new int[MessageModel.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel[MessageModel.BROADCASTING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel[MessageModel.CLUSTERING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public RebalanceImpl(String str, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientInstance) {
        this.consumerGroup = str;
        this.messageModel = messageModel;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.mQClientFactory = mQClientInstance;
    }

    public void unlock(MessageQueue messageQueue, boolean z) {
        FindBrokerResult findBrokerAddressInSubscribe = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(messageQueue), 0L, true);
        if (findBrokerAddressInSubscribe != null) {
            UnlockBatchRequestBody unlockBatchRequestBody = new UnlockBatchRequestBody();
            unlockBatchRequestBody.setConsumerGroup(this.consumerGroup);
            unlockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
            unlockBatchRequestBody.getMqSet().add(messageQueue);
            try {
                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerAddressInSubscribe.getBrokerAddr(), unlockBatchRequestBody, 1000L, z);
                log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", new Object[]{this.consumerGroup, this.mQClientFactory.getClientId(), messageQueue});
            } catch (Exception e) {
                log.error("unlockBatchMQ exception, " + messageQueue, e);
            }
        }
    }

    public void unlockAll(boolean z) {
        FindBrokerResult findBrokerAddressInSubscribe;
        for (Map.Entry<String, Set<MessageQueue>> entry : buildProcessQueueTableByBrokerName().entrySet()) {
            String key = entry.getKey();
            Set<MessageQueue> value = entry.getValue();
            if (!value.isEmpty() && (findBrokerAddressInSubscribe = this.mQClientFactory.findBrokerAddressInSubscribe(key, 0L, true)) != null) {
                UnlockBatchRequestBody unlockBatchRequestBody = new UnlockBatchRequestBody();
                unlockBatchRequestBody.setConsumerGroup(this.consumerGroup);
                unlockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
                unlockBatchRequestBody.setMqSet(value);
                try {
                    this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerAddressInSubscribe.getBrokerAddr(), unlockBatchRequestBody, 1000L, z);
                    for (MessageQueue messageQueue : value) {
                        ProcessQueue processQueue = this.processQueueTable.get(messageQueue);
                        if (processQueue != null) {
                            processQueue.setLocked(false);
                            log.info("the message queue unlock OK, Group: {} {}", this.consumerGroup, messageQueue);
                        }
                    }
                } catch (Exception e) {
                    log.error("unlockBatchMQ exception, " + value, e);
                }
            }
        }
    }

    private HashMap<String, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
        HashMap<String, Set<MessageQueue>> hashMap = new HashMap<>();
        for (Map.Entry<MessageQueue, ProcessQueue> entry : this.processQueueTable.entrySet()) {
            MessageQueue key = entry.getKey();
            if (!entry.getValue().isDropped()) {
                Set<MessageQueue> set = hashMap.get(this.mQClientFactory.getBrokerNameFromMessageQueue(key));
                if (null == set) {
                    set = new HashSet();
                    hashMap.put(key.getBrokerName(), set);
                }
                set.add(key);
            }
        }
        return hashMap;
    }

    public boolean lock(MessageQueue messageQueue) {
        FindBrokerResult findBrokerAddressInSubscribe = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(messageQueue), 0L, true);
        if (findBrokerAddressInSubscribe == null) {
            return false;
        }
        LockBatchRequestBody lockBatchRequestBody = new LockBatchRequestBody();
        lockBatchRequestBody.setConsumerGroup(this.consumerGroup);
        lockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
        lockBatchRequestBody.getMqSet().add(messageQueue);
        try {
            Set<MessageQueue> lockBatchMQ = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerAddressInSubscribe.getBrokerAddr(), lockBatchRequestBody, 1000L);
            Iterator<MessageQueue> it = lockBatchMQ.iterator();
            while (it.hasNext()) {
                ProcessQueue processQueue = this.processQueueTable.get(it.next());
                if (processQueue != null) {
                    processQueue.setLocked(true);
                    processQueue.setLastLockTimestamp(System.currentTimeMillis());
                }
            }
            boolean contains = lockBatchMQ.contains(messageQueue);
            InternalLogger internalLogger = log;
            Object[] objArr = new Object[TIMEOUT_CHECK_TIMES];
            objArr[0] = contains ? "OK" : "Failed";
            objArr[1] = this.consumerGroup;
            objArr[2] = messageQueue;
            internalLogger.info("message queue lock {}, {} {}", objArr);
            return contains;
        } catch (Exception e) {
            log.error("lockBatchMQ exception, " + messageQueue, e);
            return false;
        }
    }

    public void lockAll() {
        FindBrokerResult findBrokerAddressInSubscribe;
        ProcessQueue processQueue;
        for (Map.Entry<String, Set<MessageQueue>> entry : buildProcessQueueTableByBrokerName().entrySet()) {
            String key = entry.getKey();
            Set<MessageQueue> value = entry.getValue();
            if (!value.isEmpty() && (findBrokerAddressInSubscribe = this.mQClientFactory.findBrokerAddressInSubscribe(key, 0L, true)) != null) {
                LockBatchRequestBody lockBatchRequestBody = new LockBatchRequestBody();
                lockBatchRequestBody.setConsumerGroup(this.consumerGroup);
                lockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
                lockBatchRequestBody.setMqSet(value);
                try {
                    Set<MessageQueue> lockBatchMQ = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerAddressInSubscribe.getBrokerAddr(), lockBatchRequestBody, 1000L);
                    for (MessageQueue messageQueue : lockBatchMQ) {
                        ProcessQueue processQueue2 = this.processQueueTable.get(messageQueue);
                        if (processQueue2 != null) {
                            if (!processQueue2.isLocked()) {
                                log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, messageQueue);
                            }
                            processQueue2.setLocked(true);
                            processQueue2.setLastLockTimestamp(System.currentTimeMillis());
                        }
                    }
                    for (MessageQueue messageQueue2 : value) {
                        if (!lockBatchMQ.contains(messageQueue2) && (processQueue = this.processQueueTable.get(messageQueue2)) != null) {
                            processQueue.setLocked(false);
                            log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, messageQueue2);
                        }
                    }
                } catch (Exception e) {
                    log.error("lockBatchMQ exception, " + value, e);
                }
            }
        }
    }

    public boolean clientRebalance(String str) {
        return true;
    }

    public boolean doRebalance(boolean z) {
        boolean z2 = true;
        ConcurrentMap<String, SubscriptionData> subscriptionInner = getSubscriptionInner();
        if (subscriptionInner != null) {
            Iterator<Map.Entry<String, SubscriptionData>> it = subscriptionInner.entrySet().iterator();
            while (it.hasNext()) {
                String key = it.next().getKey();
                try {
                    z2 = (clientRebalance(key) || !tryQueryAssignment(key)) ? rebalanceByTopic(key, z) : getRebalanceResultFromBroker(key, z);
                } catch (Throwable th) {
                    if (!key.startsWith("%RETRY%")) {
                        log.warn("rebalance Exception", th);
                        z2 = false;
                    }
                }
            }
        }
        truncateMessageQueueNotMyTopic();
        return z2;
    }

    private boolean tryQueryAssignment(String str) {
        if (this.topicClientRebalance.containsKey(str)) {
            return false;
        }
        if (this.topicBrokerRebalance.containsKey(str)) {
            return true;
        }
        String name = this.allocateMessageQueueStrategy != null ? this.allocateMessageQueueStrategy.getName() : null;
        boolean z = false;
        int i = 0;
        int i2 = 0;
        while (true) {
            int i3 = i;
            i++;
            if (i3 >= TIMEOUT_CHECK_TIMES) {
                break;
            }
            try {
                this.mQClientFactory.queryAssignment(str, this.consumerGroup, name, this.messageModel, 1000 * i);
                z = true;
                break;
            } catch (Throwable th) {
                if (!(th instanceof RemotingTimeoutException)) {
                    log.error("tryQueryAssignment error.", th);
                    break;
                }
                i2++;
            }
        }
        if (z) {
            this.topicBrokerRebalance.put(str, str);
            return true;
        }
        if (i2 < TIMEOUT_CHECK_TIMES) {
            return true;
        }
        this.topicClientRebalance.put(str, str);
        return false;
    }

    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return this.subscriptionInner;
    }

    private boolean rebalanceByTopic(String str, boolean z) {
        boolean z2 = true;
        switch (AnonymousClass1.$SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel[this.messageModel.ordinal()]) {
            case TraceConstants.CONTENT_SPLITOR /* 1 */:
                Set<MessageQueue> set = this.topicSubscribeInfoTable.get(str);
                if (set == null) {
                    messageQueueChanged(str, Collections.emptySet(), Collections.emptySet());
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, str);
                    break;
                } else {
                    if (updateProcessQueueTableInRebalance(str, set, z)) {
                        messageQueueChanged(str, set, set);
                        log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, str, set, set});
                    }
                    z2 = set.equals(getWorkingMessageQueue(str));
                    break;
                }
            case TraceConstants.FIELD_SPLITOR /* 2 */:
                Set<MessageQueue> set2 = this.topicSubscribeInfoTable.get(str);
                List<String> findConsumerIdList = this.mQClientFactory.findConsumerIdList(str, this.consumerGroup);
                if (null == set2 && !str.startsWith("%RETRY%")) {
                    messageQueueChanged(str, Collections.emptySet(), Collections.emptySet());
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, str);
                }
                if (null == findConsumerIdList) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, str);
                }
                if (set2 != null && findConsumerIdList != null) {
                    ArrayList arrayList = new ArrayList();
                    arrayList.addAll(set2);
                    Collections.sort(arrayList);
                    Collections.sort(findConsumerIdList);
                    AllocateMessageQueueStrategy allocateMessageQueueStrategy = this.allocateMessageQueueStrategy;
                    try {
                        List<MessageQueue> allocate = allocateMessageQueueStrategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), arrayList, findConsumerIdList);
                        HashSet hashSet = new HashSet();
                        if (allocate != null) {
                            hashSet.addAll(allocate);
                        }
                        if (updateProcessQueueTableInRebalance(str, hashSet, z)) {
                            log.info("client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{allocateMessageQueueStrategy.getName(), this.consumerGroup, str, this.mQClientFactory.getClientId(), Integer.valueOf(set2.size()), Integer.valueOf(findConsumerIdList.size()), Integer.valueOf(hashSet.size()), hashSet});
                            messageQueueChanged(str, set2, hashSet);
                        }
                        z2 = hashSet.equals(getWorkingMessageQueue(str));
                        break;
                    } catch (Throwable th) {
                        log.error("allocate message queue exception. strategy name: {}, ex: {}", allocateMessageQueueStrategy.getName(), th);
                        return false;
                    }
                }
                break;
        }
        return z2;
    }

    private boolean getRebalanceResultFromBroker(String str, boolean z) {
        String name = this.allocateMessageQueueStrategy.getName();
        try {
            Set<MessageQueueAssignment> queryAssignment = this.mQClientFactory.queryAssignment(str, this.consumerGroup, name, this.messageModel, QUERY_ASSIGNMENT_TIMEOUT);
            if (queryAssignment == null) {
                return false;
            }
            HashSet hashSet = new HashSet();
            for (MessageQueueAssignment messageQueueAssignment : queryAssignment) {
                if (messageQueueAssignment.getMessageQueue() != null) {
                    hashSet.add(messageQueueAssignment.getMessageQueue());
                }
            }
            if (updateMessageQueueAssignment(str, queryAssignment, z)) {
                log.info("broker rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, assignmentSet={}", new Object[]{name, this.consumerGroup, str, this.mQClientFactory.getClientId(), queryAssignment});
                messageQueueChanged(str, null, hashSet);
            }
            return hashSet.equals(getWorkingMessageQueue(str));
        } catch (Exception e) {
            log.error("allocate message queue exception. strategy name: {}, ex: {}", name, e);
            return false;
        }
    }

    private Set<MessageQueue> getWorkingMessageQueue(String str) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<MessageQueue, ProcessQueue> entry : this.processQueueTable.entrySet()) {
            MessageQueue key = entry.getKey();
            ProcessQueue value = entry.getValue();
            if (key.getTopic().equals(str) && !value.isDropped()) {
                hashSet.add(key);
            }
        }
        for (Map.Entry<MessageQueue, PopProcessQueue> entry2 : this.popProcessQueueTable.entrySet()) {
            MessageQueue key2 = entry2.getKey();
            PopProcessQueue value2 = entry2.getValue();
            if (key2.getTopic().equals(str) && !value2.isDropped()) {
                hashSet.add(key2);
            }
        }
        return hashSet;
    }

    private void truncateMessageQueueNotMyTopic() {
        PopProcessQueue remove;
        ProcessQueue remove2;
        ConcurrentMap<String, SubscriptionData> subscriptionInner = getSubscriptionInner();
        for (MessageQueue messageQueue : this.processQueueTable.keySet()) {
            if (!subscriptionInner.containsKey(messageQueue.getTopic()) && (remove2 = this.processQueueTable.remove(messageQueue)) != null) {
                remove2.setDropped(true);
                log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", this.consumerGroup, messageQueue);
            }
        }
        for (MessageQueue messageQueue2 : this.popProcessQueueTable.keySet()) {
            if (!subscriptionInner.containsKey(messageQueue2.getTopic()) && (remove = this.popProcessQueueTable.remove(messageQueue2)) != null) {
                remove.setDropped(true);
                log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary pop mq, {}", this.consumerGroup, messageQueue2);
            }
        }
        Iterator<Map.Entry<String, String>> it = this.topicClientRebalance.entrySet().iterator();
        while (it.hasNext()) {
            if (!subscriptionInner.containsKey(it.next().getKey())) {
                it.remove();
            }
        }
        Iterator<Map.Entry<String, String>> it2 = this.topicBrokerRebalance.entrySet().iterator();
        while (it2.hasNext()) {
            if (!subscriptionInner.containsKey(it2.next().getKey())) {
                it2.remove();
            }
        }
    }

    private boolean updateProcessQueueTableInRebalance(String str, Set<MessageQueue> set, boolean z) {
        boolean z2 = false;
        new HashMap();
        HashMap hashMap = new HashMap(this.processQueueTable.size());
        for (Map.Entry<MessageQueue, ProcessQueue> entry : this.processQueueTable.entrySet()) {
            MessageQueue key = entry.getKey();
            ProcessQueue value = entry.getValue();
            if (key.getTopic().equals(str)) {
                if (!set.contains(key)) {
                    value.setDropped(true);
                    hashMap.put(key, value);
                } else if (value.isPullExpired() && consumeType() == ConsumeType.CONSUME_PASSIVELY) {
                    value.setDropped(true);
                    hashMap.put(key, value);
                    log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it", this.consumerGroup, key);
                }
            }
        }
        for (Map.Entry entry2 : hashMap.entrySet()) {
            MessageQueue messageQueue = (MessageQueue) entry2.getKey();
            if (removeUnnecessaryMessageQueue(messageQueue, (ProcessQueue) entry2.getValue())) {
                this.processQueueTable.remove(messageQueue);
                z2 = true;
                log.info("doRebalance, {}, remove unnecessary mq, {}", this.consumerGroup, messageQueue);
            }
        }
        boolean z3 = true;
        ArrayList arrayList = new ArrayList();
        for (MessageQueue messageQueue2 : set) {
            if (!this.processQueueTable.containsKey(messageQueue2)) {
                if (!z || lock(messageQueue2)) {
                    removeDirtyOffset(messageQueue2);
                    ProcessQueue createProcessQueue = createProcessQueue(str);
                    createProcessQueue.setLocked(true);
                    long computePullFromWhere = computePullFromWhere(messageQueue2);
                    if (computePullFromWhere < 0) {
                        log.warn("doRebalance, {}, add new mq failed, {}", this.consumerGroup, messageQueue2);
                    } else if (this.processQueueTable.putIfAbsent(messageQueue2, createProcessQueue) != null) {
                        log.info("doRebalance, {}, mq already exists, {}", this.consumerGroup, messageQueue2);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", this.consumerGroup, messageQueue2);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(this.consumerGroup);
                        pullRequest.setNextOffset(computePullFromWhere);
                        pullRequest.setMessageQueue(messageQueue2);
                        pullRequest.setProcessQueue(createProcessQueue);
                        arrayList.add(pullRequest);
                        z2 = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", this.consumerGroup, messageQueue2);
                    z3 = false;
                }
            }
        }
        if (!z3) {
            this.mQClientFactory.rebalanceLater(500L);
        }
        dispatchPullRequest(arrayList, 500L);
        return z2;
    }

    private boolean updateMessageQueueAssignment(String str, Set<MessageQueueAssignment> set, boolean z) {
        boolean z2 = false;
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (MessageQueueAssignment messageQueueAssignment : set) {
            MessageQueue messageQueue = messageQueueAssignment.getMessageQueue();
            if (messageQueue != null) {
                if (MessageRequestMode.POP == messageQueueAssignment.getMode()) {
                    hashMap2.put(messageQueue, messageQueueAssignment);
                } else {
                    hashMap.put(messageQueue, messageQueueAssignment);
                }
            }
        }
        if (!str.startsWith("%RETRY%")) {
            if (hashMap2.isEmpty() && !hashMap.isEmpty()) {
                try {
                    String buildPopRetryTopic = KeyBuilder.buildPopRetryTopic(str, getConsumerGroup());
                    getSubscriptionInner().put(buildPopRetryTopic, FilterAPI.buildSubscriptionData(buildPopRetryTopic, "*"));
                } catch (Exception e) {
                }
            } else if (!hashMap2.isEmpty() && hashMap.isEmpty()) {
                try {
                    getSubscriptionInner().remove(KeyBuilder.buildPopRetryTopic(str, getConsumerGroup()));
                } catch (Exception e2) {
                }
            }
        }
        HashMap hashMap3 = new HashMap(this.processQueueTable.size());
        for (Map.Entry<MessageQueue, ProcessQueue> entry : this.processQueueTable.entrySet()) {
            MessageQueue key = entry.getKey();
            ProcessQueue value = entry.getValue();
            if (key.getTopic().equals(str)) {
                if (!hashMap.containsKey(key)) {
                    value.setDropped(true);
                    hashMap3.put(key, value);
                } else if (value.isPullExpired() && consumeType() == ConsumeType.CONSUME_PASSIVELY) {
                    value.setDropped(true);
                    hashMap3.put(key, value);
                    log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it", this.consumerGroup, key);
                }
            }
        }
        for (Map.Entry entry2 : hashMap3.entrySet()) {
            MessageQueue messageQueue2 = (MessageQueue) entry2.getKey();
            if (removeUnnecessaryMessageQueue(messageQueue2, (ProcessQueue) entry2.getValue())) {
                this.processQueueTable.remove(messageQueue2);
                z2 = true;
                log.info("doRebalance, {}, remove unnecessary mq, {}", this.consumerGroup, messageQueue2);
            }
        }
        HashMap hashMap4 = new HashMap(this.popProcessQueueTable.size());
        for (Map.Entry<MessageQueue, PopProcessQueue> entry3 : this.popProcessQueueTable.entrySet()) {
            MessageQueue key2 = entry3.getKey();
            PopProcessQueue value2 = entry3.getValue();
            if (key2.getTopic().equals(str)) {
                if (!hashMap2.containsKey(key2)) {
                    value2.setDropped(true);
                    hashMap4.put(key2, value2);
                } else if (value2.isPullExpired() && consumeType() == ConsumeType.CONSUME_PASSIVELY) {
                    value2.setDropped(true);
                    hashMap4.put(key2, value2);
                    log.error("[BUG]doRebalance, {}, try remove unnecessary pop mq, {}, because pop is pause, so try to fixed it", this.consumerGroup, key2);
                }
            }
        }
        for (Map.Entry entry4 : hashMap4.entrySet()) {
            MessageQueue messageQueue3 = (MessageQueue) entry4.getKey();
            if (removeUnnecessaryPopMessageQueue(messageQueue3, (PopProcessQueue) entry4.getValue())) {
                this.popProcessQueueTable.remove(messageQueue3);
                z2 = true;
                log.info("doRebalance, {}, remove unnecessary pop mq, {}", this.consumerGroup, messageQueue3);
            }
        }
        boolean z3 = true;
        ArrayList arrayList = new ArrayList();
        for (MessageQueue messageQueue4 : hashMap.keySet()) {
            if (!this.processQueueTable.containsKey(messageQueue4)) {
                if (!z || lock(messageQueue4)) {
                    removeDirtyOffset(messageQueue4);
                    ProcessQueue createProcessQueue = createProcessQueue();
                    createProcessQueue.setLocked(true);
                    try {
                        long computePullFromWhereWithException = computePullFromWhereWithException(messageQueue4);
                        if (computePullFromWhereWithException < 0) {
                            log.warn("doRebalance, {}, add new mq failed, {}", this.consumerGroup, messageQueue4);
                        } else if (this.processQueueTable.putIfAbsent(messageQueue4, createProcessQueue) != null) {
                            log.info("doRebalance, {}, mq already exists, {}", this.consumerGroup, messageQueue4);
                        } else {
                            log.info("doRebalance, {}, add a new mq, {}", this.consumerGroup, messageQueue4);
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(this.consumerGroup);
                            pullRequest.setNextOffset(computePullFromWhereWithException);
                            pullRequest.setMessageQueue(messageQueue4);
                            pullRequest.setProcessQueue(createProcessQueue);
                            arrayList.add(pullRequest);
                            z2 = true;
                        }
                    } catch (Exception e3) {
                        log.info("doRebalance, {}, compute offset failed, {}", this.consumerGroup, messageQueue4);
                    }
                } else {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", this.consumerGroup, messageQueue4);
                    z3 = false;
                }
            }
        }
        if (!z3) {
            this.mQClientFactory.rebalanceLater(500L);
        }
        dispatchPullRequest(arrayList, 500L);
        ArrayList arrayList2 = new ArrayList();
        for (MessageQueue messageQueue5 : hashMap2.keySet()) {
            if (!this.popProcessQueueTable.containsKey(messageQueue5)) {
                PopProcessQueue createPopProcessQueue = createPopProcessQueue();
                if (this.popProcessQueueTable.putIfAbsent(messageQueue5, createPopProcessQueue) != null) {
                    log.info("doRebalance, {}, mq pop already exists, {}", this.consumerGroup, messageQueue5);
                } else {
                    log.info("doRebalance, {}, add a new pop mq, {}", this.consumerGroup, messageQueue5);
                    PopRequest popRequest = new PopRequest();
                    popRequest.setTopic(str);
                    popRequest.setConsumerGroup(this.consumerGroup);
                    popRequest.setMessageQueue(messageQueue5);
                    popRequest.setPopProcessQueue(createPopProcessQueue);
                    popRequest.setInitMode(getConsumeInitMode());
                    arrayList2.add(popRequest);
                    z2 = true;
                }
            }
        }
        dispatchPopPullRequest(arrayList2, 500L);
        return z2;
    }

    public abstract void messageQueueChanged(String str, Set<MessageQueue> set, Set<MessageQueue> set2);

    public abstract boolean removeUnnecessaryMessageQueue(MessageQueue messageQueue, ProcessQueue processQueue);

    public boolean removeUnnecessaryPopMessageQueue(MessageQueue messageQueue, PopProcessQueue popProcessQueue) {
        return true;
    }

    public abstract ConsumeType consumeType();

    public abstract void removeDirtyOffset(MessageQueue messageQueue);

    @Deprecated
    public abstract long computePullFromWhere(MessageQueue messageQueue);

    public abstract long computePullFromWhereWithException(MessageQueue messageQueue) throws MQClientException;

    public abstract int getConsumeInitMode();

    public abstract void dispatchPullRequest(List<PullRequest> list, long j);

    public abstract void dispatchPopPullRequest(List<PopRequest> list, long j);

    public abstract ProcessQueue createProcessQueue();

    public abstract PopProcessQueue createPopProcessQueue();

    public abstract ProcessQueue createProcessQueue(String str);

    public void removeProcessQueue(MessageQueue messageQueue) {
        ProcessQueue remove = this.processQueueTable.remove(messageQueue);
        if (remove != null) {
            boolean isDropped = remove.isDropped();
            remove.setDropped(true);
            removeUnnecessaryMessageQueue(messageQueue, remove);
            log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", new Object[]{this.consumerGroup, messageQueue, Boolean.valueOf(isDropped)});
        }
    }

    public ConcurrentMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
        return this.processQueueTable;
    }

    public ConcurrentMap<MessageQueue, PopProcessQueue> getPopProcessQueueTable() {
        return this.popProcessQueueTable;
    }

    public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
        return this.topicSubscribeInfoTable;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String str) {
        this.consumerGroup = str;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public void setMessageModel(MessageModel messageModel) {
        this.messageModel = messageModel;
    }

    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
        return this.allocateMessageQueueStrategy;
    }

    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    }

    public MQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(MQClientInstance mQClientInstance) {
        this.mQClientFactory = mQClientInstance;
    }

    public void destroy() {
        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().setDropped(true);
        }
        this.processQueueTable.clear();
        Iterator<Map.Entry<MessageQueue, PopProcessQueue>> it2 = this.popProcessQueueTable.entrySet().iterator();
        while (it2.hasNext()) {
            it2.next().getValue().setDropped(true);
        }
        this.popProcessQueueTable.clear();
    }
}
