package org.apache.rocketmq.mqtt.ds.store;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.mqtt.common.facade.LmqOffsetStore;
import org.apache.rocketmq.mqtt.common.model.Queue;
import org.apache.rocketmq.mqtt.common.model.QueueOffset;
import org.apache.rocketmq.mqtt.common.model.Subscription;
import org.apache.rocketmq.mqtt.ds.config.ServiceConf;
import org.apache.rocketmq.mqtt.ds.meta.FirstTopicManager;
import org.apache.rocketmq.mqtt.ds.mq.MqFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/rocketmq/mqtt/ds/store/LmqOffsetStoreManager.class */
public class LmqOffsetStoreManager implements LmqOffsetStore {
    private static Logger logger = LoggerFactory.getLogger(LmqOffsetStoreManager.class);
    private DefaultMQPullConsumer defaultMQPullConsumer;

    @Resource
    private ServiceConf serviceConf;

    @Resource
    private FirstTopicManager firstTopicManager;

    @PostConstruct
    public void init() throws MQClientException {
        this.defaultMQPullConsumer = MqFactory.buildDefaultMQPullConsumer("CID_RMQ_SYS_LMQ_OFFSET", this.serviceConf.getProperties());
        this.defaultMQPullConsumer.setConsumerPullTimeoutMillis(2000L);
        this.defaultMQPullConsumer.start();
    }

    public void save(String str, Map<Subscription, Map<Queue, QueueOffset>> map) {
        if (map == null || map.isEmpty()) {
            return;
        }
        for (Map.Entry<Subscription, Map<Queue, QueueOffset>> entry : map.entrySet()) {
            Map<String, String> findBrokers = findBrokers(entry.getKey());
            if (findBrokers != null && !findBrokers.isEmpty()) {
                for (Map.Entry<Queue, QueueOffset> entry2 : entry.getValue().entrySet()) {
                    try {
                        Queue key = entry2.getKey();
                        if (!StringUtils.isBlank(key.getBrokerName())) {
                            String str2 = findBrokers.get(key.getBrokerName());
                            QueueOffset value = entry2.getValue();
                            UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = new UpdateConsumerOffsetRequestHeader();
                            updateConsumerOffsetRequestHeader.setTopic(StringUtils.replace(key.getQueueName(), "/", "%"));
                            updateConsumerOffsetRequestHeader.setConsumerGroup("%LMQ%" + str);
                            updateConsumerOffsetRequestHeader.setQueueId(Integer.valueOf((int) key.getQueueId()));
                            updateConsumerOffsetRequestHeader.setCommitOffset(Long.valueOf(value.getOffset()));
                            this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl().updateConsumerOffset(str2, updateConsumerOffsetRequestHeader, 1000L);
                        }
                    } catch (Exception e) {
                        logger.error("", e);
                    }
                }
            }
        }
    }

    public CompletableFuture<Map<Queue, QueueOffset>> getOffset(String str, Subscription subscription) {
        return CompletableFuture.supplyAsync(() -> {
            HashMap hashMap = new HashMap();
            Map<String, String> findBrokers = findBrokers(subscription);
            if (findBrokers == null || findBrokers.isEmpty()) {
                return hashMap;
            }
            for (Map.Entry<String, String> entry : findBrokers.entrySet()) {
                Queue queue = new Queue(0L, subscription.toQueueName(), entry.getKey());
                String value = entry.getValue();
                QueueOffset queueOffset = new QueueOffset();
                hashMap.put(queue, queueOffset);
                try {
                    QueryConsumerOffsetRequestHeader queryConsumerOffsetRequestHeader = new QueryConsumerOffsetRequestHeader();
                    queryConsumerOffsetRequestHeader.setTopic(StringUtils.replace(queue.getQueueName(), "/", "%"));
                    queryConsumerOffsetRequestHeader.setConsumerGroup("%LMQ%" + str);
                    queryConsumerOffsetRequestHeader.setQueueId(Integer.valueOf((int) queue.getQueueId()));
                    queueOffset.setOffset(this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().getMQClientAPIImpl().queryConsumerOffset(value, queryConsumerOffsetRequestHeader, 1000L));
                } catch (MQBrokerException e) {
                    if (22 == e.getResponseCode()) {
                        queueOffset.setOffset(Long.MAX_VALUE);
                    }
                } catch (Exception e2) {
                    logger.error("{}", str, e2);
                    throw new RuntimeException(e2);
                }
            }
            return hashMap;
        });
    }

    private Map<String, String> findBrokers(Subscription subscription) {
        String firstTopic = subscription.toFirstTopic();
        if (subscription.isRetry()) {
            firstTopic = this.serviceConf.getClientRetryTopic();
        }
        if (subscription.isP2p()) {
            firstTopic = StringUtils.isNotBlank(this.serviceConf.getClientP2pTopic()) ? this.serviceConf.getClientP2pTopic() : this.serviceConf.getClientRetryTopic();
        }
        return this.firstTopicManager.getBrokerAddressMap(firstTopic);
    }
}
