package org.apache.rocketmq.streams.core.running;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
import org.apache.rocketmq.streams.core.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.class */
class MessageQueueListenerWrapper implements MessageQueueListener {
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueListenerWrapper.class.getName());
    private final MessageQueueListener originListener;
    private final TopologyBuilder topologyBuilder;
    private final ConcurrentHashMap<String, Set<MessageQueue>> ownedMapping = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Processor<?>> mq2Processor = new ConcurrentHashMap<>();
    private BiFunction<Set<MessageQueue>, Set<MessageQueue>, Throwable> recoverHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageQueueListenerWrapper(MessageQueueListener messageQueueListener, TopologyBuilder topologyBuilder) {
        this.originListener = messageQueueListener;
        this.topologyBuilder = topologyBuilder;
    }

    public void messageQueueChanged(String str, Set<MessageQueue> set, Set<MessageQueue> set2) {
        Set<MessageQueue> computeIfAbsent = this.ownedMapping.computeIfAbsent(str, str2 -> {
            return new HashSet();
        });
        HashSet hashSet = new HashSet(set2);
        hashSet.removeAll(computeIfAbsent);
        HashSet hashSet2 = new HashSet(computeIfAbsent);
        hashSet2.removeAll(set2);
        computeIfAbsent.addAll(new HashSet(hashSet));
        computeIfAbsent.removeAll(new HashSet(hashSet2));
        if (str.endsWith(Constant.SHUFFLE_TOPIC_SUFFIX)) {
            Throwable apply = this.recoverHandler.apply(hashSet, hashSet2);
            if (apply != null) {
                throw new RuntimeException(apply);
            }
            logger.info("recover messageQueue finish, addQueue: [{}], removeQueue:[{}].", hashSet, hashSet2);
        }
        buildTask(hashSet);
        this.originListener.messageQueueChanged(str, set, set2);
        removeTask(hashSet2);
    }

    private void buildTask(Set<MessageQueue> set) {
        for (MessageQueue messageQueue : set) {
            String buildKey = Utils.buildKey(messageQueue.getBrokerName(), messageQueue.getTopic(), messageQueue.getQueueId());
            if (!this.mq2Processor.containsKey(buildKey)) {
                this.mq2Processor.put(buildKey, this.topologyBuilder.build(messageQueue.getTopic()));
            }
        }
    }

    private void removeTask(Set<MessageQueue> set) {
        for (MessageQueue messageQueue : set) {
            this.mq2Processor.remove(Utils.buildKey(messageQueue.getBrokerName(), messageQueue.getTopic(), messageQueue.getQueueId()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> Processor<T> selectProcessor(String str) {
        return (Processor) this.mq2Processor.get(str);
    }

    public void setRecoverHandler(BiFunction<Set<MessageQueue>, Set<MessageQueue>, Throwable> biFunction) {
        this.recoverHandler = biFunction;
    }
}
