package org.apache.rocketmq.proxy.processor;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Stopwatch;
import io.netty.channel.Channel;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

/* loaded from: input_file:org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.class */
public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
    protected ThreadPoolExecutor renewalWorkerService;
    protected final MessagingProcessor messagingProcessor;
    protected static final Logger log = LoggerFactory.getLogger("RocketmqProxy");
    protected static final RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
    protected final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
    protected final ConcurrentMap<ReceiptHandleGroupKey, ReceiptHandleGroup> receiptHandleGroupMap = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor$ReceiptHandleGroupKey.class */
    public static class ReceiptHandleGroupKey {
        protected final Channel channel;
        protected final String group;

        public ReceiptHandleGroupKey(Channel channel, String str) {
            this.channel = channel;
            this.group = str;
        }

        protected String getChannelId() {
            return this.channel.id().asLongText();
        }

        public String getGroup() {
            return this.group;
        }

        public Channel getChannel() {
            return this.channel;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ReceiptHandleGroupKey receiptHandleGroupKey = (ReceiptHandleGroupKey) obj;
            return Objects.equal(getChannelId(), receiptHandleGroupKey.getChannelId()) && Objects.equal(this.group, receiptHandleGroupKey.group);
        }

        public int hashCode() {
            return Objects.hashCode(new Object[]{getChannelId(), this.group});
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("channelId", getChannelId()).add("group", this.group).toString();
        }
    }

    public ReceiptHandleProcessor(MessagingProcessor messagingProcessor) {
        this.messagingProcessor = messagingProcessor;
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        this.renewalWorkerService = ThreadPoolMonitor.createAndMonitor(proxyConfig.getRenewThreadPoolNums(), proxyConfig.getRenewMaxThreadPoolNums(), 1L, TimeUnit.MINUTES, "RenewalWorkerThread", proxyConfig.getRenewThreadPoolQueueCapacity());
        init();
    }

    protected void init() {
        registerConsumerListener();
        this.renewalWorkerService.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> {
            log.warn("add renew task failed. queueSize:{}", Integer.valueOf(threadPoolExecutor.getQueue().size()));
        });
        appendStartAndShutdown(new StartAndShutdown() { // from class: org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor.1
            public void start() throws Exception {
                ReceiptHandleProcessor.this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                    ReceiptHandleProcessor.this.scheduleRenewTask();
                }, 0L, ConfigurationManager.getProxyConfig().getRenewSchedulePeriodMillis(), TimeUnit.MILLISECONDS);
            }

            public void shutdown() throws Exception {
                ReceiptHandleProcessor.this.scheduledExecutorService.shutdown();
                ReceiptHandleProcessor.this.clearAllHandle();
            }
        });
    }

    protected void registerConsumerListener() {
        this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListener() { // from class: org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor.2
            public void handle(ConsumerGroupEvent consumerGroupEvent, String str, Object... objArr) {
                if (!ConsumerGroupEvent.CLIENT_UNREGISTER.equals(consumerGroupEvent) || objArr == null || objArr.length < 1 || !(objArr[0] instanceof ClientChannelInfo)) {
                    return;
                }
                ClientChannelInfo clientChannelInfo = (ClientChannelInfo) objArr[0];
                if (ChannelHelper.isRemote(clientChannelInfo.getChannel())) {
                    return;
                }
                ReceiptHandleProcessor.this.clearGroup(new ReceiptHandleGroupKey(clientChannelInfo.getChannel(), str));
                ReceiptHandleProcessor.log.info("clear handle of this client when client unregister. group:{}, clientChannelInfo:{}", str, clientChannelInfo);
            }

            public void shutdown() {
            }
        });
    }

    protected ProxyContext createContext(String str) {
        return ProxyContext.createForInner(getClass().getSimpleName() + str);
    }

    protected void scheduleRenewTask() {
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
            for (Map.Entry<ReceiptHandleGroupKey, ReceiptHandleGroup> entry : this.receiptHandleGroupMap.entrySet()) {
                ReceiptHandleGroupKey key = entry.getKey();
                if (clientIsOffline(key)) {
                    clearGroup(key);
                } else {
                    ReceiptHandleGroup value = entry.getValue();
                    value.scan((str, str2, messageReceiptHandle) -> {
                        if (ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()).getNextVisibleTime() - System.currentTimeMillis() > proxyConfig.getRenewAheadTimeMillis()) {
                            return;
                        }
                        this.renewalWorkerService.submit(() -> {
                            renewMessage(value, str, str2);
                        });
                    });
                }
            }
        } catch (Exception e) {
            log.error("unexpect error when schedule renew task", e);
        }
        log.debug("scan for renewal done. cost:{}ms", Long.valueOf(createStarted.elapsed().toMillis()));
    }

    protected void renewMessage(ReceiptHandleGroup receiptHandleGroup, String str, String str2) {
        try {
            receiptHandleGroup.computeIfPresent(str, str2, this::startRenewMessage);
        } catch (Exception e) {
            log.error("error when renew message. msgID:{}, handleStr:{}", new Object[]{str, str2, e});
        }
    }

    protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageReceiptHandle messageReceiptHandle) {
        CompletableFuture<MessageReceiptHandle> completableFuture = new CompletableFuture<>();
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        ProxyContext createContext = createContext("RenewMessage");
        ReceiptHandle decode = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
        long currentTimeMillis = System.currentTimeMillis();
        try {
        } catch (Throwable th) {
            log.error("unexpect error when renew message, stop to renew it. handle:{}", messageReceiptHandle, th);
            completableFuture.complete(null);
        }
        if (messageReceiptHandle.getRenewRetryTimes() >= proxyConfig.getMaxRenewRetryTimes()) {
            log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
            return CompletableFuture.completedFuture(null);
        }
        if (currentTimeMillis - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
            this.messagingProcessor.changeInvisibleTime(createContext, decode, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), RENEW_POLICY.nextDelayDuration(messageReceiptHandle.getRenewTimes())).whenComplete((ackResult, th2) -> {
                if (th2 != null) {
                    log.error("error when renew. handle:{}", messageReceiptHandle, th2);
                    if (!renewExceptionNeedRetry(th2)) {
                        completableFuture.complete(null);
                        return;
                    } else {
                        messageReceiptHandle.incrementAndGetRenewRetryTimes();
                        completableFuture.complete(messageReceiptHandle);
                        return;
                    }
                }
                if (!AckStatus.OK.equals(ackResult.getStatus())) {
                    log.error("renew response is not ok. result:{}, handle:{}", ackResult, messageReceiptHandle);
                    completableFuture.complete(null);
                } else {
                    messageReceiptHandle.updateReceiptHandle(ackResult.getExtraInfo());
                    messageReceiptHandle.resetRenewRetryTimes();
                    messageReceiptHandle.incrementRenewTimes();
                    completableFuture.complete(messageReceiptHandle);
                }
            });
        } else {
            SubscriptionGroupConfig subscriptionGroupConfig = this.messagingProcessor.getMetadataService().getSubscriptionGroupConfig(createContext, messageReceiptHandle.getGroup());
            if (subscriptionGroupConfig == null) {
                log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
                return CompletableFuture.completedFuture(null);
            }
            this.messagingProcessor.changeInvisibleTime(createContext, decode, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), subscriptionGroupConfig.getGroupRetryPolicy().getRetryPolicy().nextDelayDuration(messageReceiptHandle.getReconsumeTimes())).whenComplete((ackResult2, th3) -> {
                if (th3 != null) {
                    log.error("error when nack in renew. handle:{}", messageReceiptHandle, th3);
                }
                completableFuture.complete(null);
            });
        }
        return completableFuture;
    }

    protected boolean renewExceptionNeedRetry(Throwable th) {
        Throwable realException = ExceptionUtils.getRealException(th);
        if (!(realException instanceof ProxyException)) {
            return true;
        }
        ProxyException proxyException = (ProxyException) realException;
        return (ProxyExceptionCode.INVALID_BROKER_NAME.equals(proxyException.getCode()) || ProxyExceptionCode.INVALID_RECEIPT_HANDLE.equals(proxyException.getCode())) ? false : true;
    }

    protected boolean clientIsOffline(ReceiptHandleGroupKey receiptHandleGroupKey) {
        return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), receiptHandleGroupKey.group, receiptHandleGroupKey.channel) == null;
    }

    public void addReceiptHandle(ProxyContext proxyContext, Channel channel, String str, String str2, String str3, MessageReceiptHandle messageReceiptHandle) {
        addReceiptHandle(proxyContext, new ReceiptHandleGroupKey(channel, str), str2, str3, messageReceiptHandle);
    }

    protected void addReceiptHandle(ProxyContext proxyContext, ReceiptHandleGroupKey receiptHandleGroupKey, String str, String str2, MessageReceiptHandle messageReceiptHandle) {
        if (receiptHandleGroupKey == null) {
            return;
        }
        ((ReceiptHandleGroup) ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, receiptHandleGroupKey, receiptHandleGroupKey2 -> {
            return new ReceiptHandleGroup();
        })).put(str, str2, messageReceiptHandle);
    }

    public MessageReceiptHandle removeReceiptHandle(ProxyContext proxyContext, Channel channel, String str, String str2, String str3) {
        return removeReceiptHandle(proxyContext, new ReceiptHandleGroupKey(channel, str), str2, str3);
    }

    protected MessageReceiptHandle removeReceiptHandle(ProxyContext proxyContext, ReceiptHandleGroupKey receiptHandleGroupKey, String str, String str2) {
        ReceiptHandleGroup receiptHandleGroup;
        if (receiptHandleGroupKey == null || (receiptHandleGroup = this.receiptHandleGroupMap.get(receiptHandleGroupKey)) == null) {
            return null;
        }
        return receiptHandleGroup.remove(str, str2);
    }

    protected void clearGroup(ReceiptHandleGroupKey receiptHandleGroupKey) {
        if (receiptHandleGroupKey == null) {
            return;
        }
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        ProxyContext createContext = createContext("ClearGroup");
        ReceiptHandleGroup remove = this.receiptHandleGroupMap.remove(receiptHandleGroupKey);
        if (remove == null) {
            return;
        }
        remove.scan((str, str2, messageReceiptHandle) -> {
            try {
                remove.computeIfPresent(str, str2, messageReceiptHandle -> {
                    this.messagingProcessor.changeInvisibleTime(createContext, ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr()), messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getInvisibleTimeMillisWhenClear());
                    return CompletableFuture.completedFuture(null);
                });
            } catch (Exception e) {
                log.error("error when clear handle for group. key:{}", receiptHandleGroupKey, e);
            }
        });
    }

    protected void clearAllHandle() {
        log.info("start clear all handle in receiptHandleProcessor");
        Iterator<ReceiptHandleGroupKey> it = this.receiptHandleGroupMap.keySet().iterator();
        while (it.hasNext()) {
            clearGroup(it.next());
        }
        log.info("clear all handle in receiptHandleProcessor done");
    }
}
