package io.openmessaging.rocketmq.consumer;

import io.openmessaging.KeyValue;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openmessaging/rocketmq/consumer/LocalMessageCache.class */
class LocalMessageCache implements ServiceLifecycle {
    private static final Logger log = LoggerFactory.getLogger(LocalMessageCache.class);
    private final BlockingQueue<ConsumeRequest> consumeRequestCache;
    private final DefaultMQPullConsumer rocketmqPullConsumer;
    private final ClientConfig clientConfig;
    private final Map<String, ConsumeRequest> consumedRequest = new ConcurrentHashMap();
    private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("OMS_CleanExpireMsgScheduledThread_"));

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalMessageCache(DefaultMQPullConsumer defaultMQPullConsumer, ClientConfig clientConfig) {
        this.consumeRequestCache = new LinkedBlockingQueue(clientConfig.getRmqPullMessageCacheCapacity());
        this.rocketmqPullConsumer = defaultMQPullConsumer;
        this.clientConfig = clientConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int nextPullBatchNums() {
        return Math.min(this.clientConfig.getRmqPullMessageBatchNums(), this.consumeRequestCache.remainingCapacity());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long nextPullOffset(MessageQueue messageQueue) {
        if (!this.pullOffsetTable.containsKey(messageQueue)) {
            try {
                this.pullOffsetTable.putIfAbsent(messageQueue, Long.valueOf(this.rocketmqPullConsumer.fetchConsumeOffset(messageQueue, false)));
            } catch (MQClientException e) {
                log.error("An error occurred in fetch consume offset process.", e);
            }
        }
        return this.pullOffsetTable.get(messageQueue).longValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updatePullOffset(MessageQueue messageQueue, long j) {
        this.pullOffsetTable.put(messageQueue, Long.valueOf(j));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void submitConsumeRequest(ConsumeRequest consumeRequest) {
        try {
            this.consumeRequestCache.put(consumeRequest);
        } catch (InterruptedException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageExt poll() {
        return poll(this.clientConfig.getOperationTimeout());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageExt poll(KeyValue keyValue) {
        int operationTimeout = this.clientConfig.getOperationTimeout();
        if (keyValue.containsKey("TIMEOUT")) {
            operationTimeout = keyValue.getInt("TIMEOUT");
        }
        return poll(operationTimeout);
    }

    private MessageExt poll(long j) {
        try {
            ConsumeRequest poll = this.consumeRequestCache.poll(j, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return null;
            }
            MessageExt messageExt = poll.getMessageExt();
            poll.setStartConsumeTimeMillis(System.currentTimeMillis());
            MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(poll.getStartConsumeTimeMillis()));
            this.consumedRequest.put(messageExt.getMsgId(), poll);
            return messageExt;
        } catch (InterruptedException e) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ack(String str) {
        ConsumeRequest remove = this.consumedRequest.remove(str);
        if (remove != null) {
            try {
                this.rocketmqPullConsumer.updateConsumeOffset(remove.getMessageQueue(), remove.getProcessQueue().removeMessage(Collections.singletonList(remove.getMessageExt())));
            } catch (MQClientException e) {
                log.error("An error occurred in update consume offset process.", e);
            }
        }
    }

    void ack(MessageQueue messageQueue, ProcessQueue processQueue, MessageExt messageExt) {
        this.consumedRequest.remove(messageExt.getMsgId());
        try {
            this.rocketmqPullConsumer.updateConsumeOffset(messageQueue, processQueue.removeMessage(Collections.singletonList(messageExt)));
        } catch (MQClientException e) {
            log.error("An error occurred in update consume offset process.", e);
        }
    }

    public void startup() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { // from class: io.openmessaging.rocketmq.consumer.LocalMessageCache.1
            @Override // java.lang.Runnable
            public void run() {
                LocalMessageCache.this.cleanExpireMsg();
            }
        }, this.clientConfig.getRmqMessageConsumeTimeout(), this.clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES);
    }

    public void shutdown() {
        ThreadUtils.shutdownGracefully(this.cleanExpireMsgExecutors, 5000L, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanExpireMsg() {
        loop0: for (Map.Entry entry : this.rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getProcessQueueTable().entrySet()) {
            ProcessQueue processQueue = (ProcessQueue) entry.getValue();
            MessageQueue messageQueue = (MessageQueue) entry.getKey();
            ReadWriteLock lockInProcessQueue = getLockInProcessQueue(processQueue);
            if (lockInProcessQueue == null) {
                log.error("Gets tree map lock in process queue error, may be has compatibility issue");
                return;
            }
            TreeMap msgTreeMap = processQueue.getMsgTreeMap();
            int size = msgTreeMap.size();
            int i = 0;
            while (true) {
                if (i < size) {
                    Message message = null;
                    try {
                        lockInProcessQueue.readLock().lockInterruptibly();
                        try {
                        } catch (Throwable th) {
                            lockInProcessQueue.readLock().unlock();
                            throw th;
                            break loop0;
                        }
                    } catch (InterruptedException e) {
                        log.error("Gets expired message exception", e);
                    }
                    if (msgTreeMap.isEmpty()) {
                        lockInProcessQueue.readLock().unlock();
                        break;
                    }
                    message = (MessageExt) msgTreeMap.firstEntry().getValue();
                    if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(message)) <= this.clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
                        lockInProcessQueue.readLock().unlock();
                        break;
                    }
                    lockInProcessQueue.readLock().unlock();
                    try {
                        this.rocketmqPullConsumer.sendMessageBack(message, 3);
                        log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", new Object[]{message.getTopic(), message.getMsgId(), message.getStoreHost(), Integer.valueOf(message.getQueueId()), Long.valueOf(message.getQueueOffset())});
                        ack(messageQueue, processQueue, message);
                    } catch (Exception e2) {
                        log.error("Send back expired msg exception", e2);
                    }
                    i++;
                }
            }
        }
    }

    private ReadWriteLock getLockInProcessQueue(ProcessQueue processQueue) {
        try {
            return (ReadWriteLock) FieldUtils.readDeclaredField(processQueue, "lockTreeMap", true);
        } catch (IllegalAccessException e) {
            return null;
        }
    }
}
