package org.apache.rocketmq.proxy.processor;

import io.netty.channel.Channel;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.ServiceManagerFactory;
import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

/* loaded from: input_file:org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.class */
public class DefaultMessagingProcessor extends AbstractStartAndShutdown implements MessagingProcessor {
    protected ServiceManager serviceManager;
    protected ProducerProcessor producerProcessor;
    protected ConsumerProcessor consumerProcessor;
    protected TransactionProcessor transactionProcessor;
    protected ClientProcessor clientProcessor;
    protected RequestBrokerProcessor requestBrokerProcessor;
    protected ReceiptHandleProcessor receiptHandleProcessor;
    protected ThreadPoolExecutor producerProcessorExecutor;
    protected ThreadPoolExecutor consumerProcessorExecutor;
    protected static final String ROCKETMQ_HOME = System.getProperty("rocketmq.home.dir", System.getenv("ROCKETMQ_HOME"));

    protected DefaultMessagingProcessor(ServiceManager serviceManager) {
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        this.producerProcessorExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getProducerProcessorThreadPoolNums(), proxyConfig.getProducerProcessorThreadPoolNums(), 1L, TimeUnit.MINUTES, "ProducerProcessorExecutor", proxyConfig.getProducerProcessorThreadPoolQueueCapacity());
        this.consumerProcessorExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getConsumerProcessorThreadPoolNums(), proxyConfig.getConsumerProcessorThreadPoolNums(), 1L, TimeUnit.MINUTES, "ConsumerProcessorExecutor", proxyConfig.getConsumerProcessorThreadPoolQueueCapacity());
        this.serviceManager = serviceManager;
        this.producerProcessor = new ProducerProcessor(this, serviceManager, this.producerProcessorExecutor);
        this.consumerProcessor = new ConsumerProcessor(this, serviceManager, this.consumerProcessorExecutor);
        this.transactionProcessor = new TransactionProcessor(this, serviceManager);
        this.clientProcessor = new ClientProcessor(this, serviceManager);
        this.requestBrokerProcessor = new RequestBrokerProcessor(this, serviceManager);
        this.receiptHandleProcessor = new ReceiptHandleProcessor(this, serviceManager);
        init();
    }

    public static DefaultMessagingProcessor createForLocalMode(BrokerController brokerController) {
        return createForLocalMode(brokerController, null);
    }

    public static DefaultMessagingProcessor createForLocalMode(BrokerController brokerController, RPCHook rPCHook) {
        return new DefaultMessagingProcessor(ServiceManagerFactory.createForLocalMode(brokerController, rPCHook));
    }

    public static DefaultMessagingProcessor createForClusterMode() {
        RPCHook rPCHook = null;
        if (ConfigurationManager.getProxyConfig().isEnableAclRpcHookForClusterMode()) {
            rPCHook = AclUtils.getAclRPCHook(ROCKETMQ_HOME + "/conf/tools.yml");
        }
        return createForClusterMode(rPCHook);
    }

    public static DefaultMessagingProcessor createForClusterMode(RPCHook rPCHook) {
        return new DefaultMessagingProcessor(ServiceManagerFactory.createForClusterMode(rPCHook));
    }

    protected void init() {
        appendStartAndShutdown(this.serviceManager);
        ThreadPoolExecutor threadPoolExecutor = this.producerProcessorExecutor;
        threadPoolExecutor.getClass();
        appendShutdown(threadPoolExecutor::shutdown);
        ThreadPoolExecutor threadPoolExecutor2 = this.consumerProcessorExecutor;
        threadPoolExecutor2.getClass();
        appendShutdown(threadPoolExecutor2::shutdown);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext proxyContext, String str) {
        return this.serviceManager.getMetadataService().getSubscriptionGroupConfig(proxyContext, str);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public ProxyTopicRouteData getTopicRouteDataForProxy(ProxyContext proxyContext, List<Address> list, String str) throws Exception {
        return this.serviceManager.getTopicRouteService().getTopicRouteForProxy(proxyContext, list, str);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<List<SendResult>> sendMessage(ProxyContext proxyContext, QueueSelector queueSelector, String str, int i, List<Message> list, long j) {
        return this.producerProcessor.sendMessage(proxyContext, queueSelector, str, i, list, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<RemotingCommand> forwardMessageToDeadLetterQueue(ProxyContext proxyContext, ReceiptHandle receiptHandle, String str, String str2, String str3, long j) {
        return this.producerProcessor.forwardMessageToDeadLetterQueue(proxyContext, receiptHandle, str, str2, str3, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<Void> endTransaction(ProxyContext proxyContext, String str, String str2, String str3, TransactionStatus transactionStatus, boolean z, long j) {
        return this.transactionProcessor.endTransaction(proxyContext, str, str2, str3, transactionStatus, z, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<PopResult> popMessage(ProxyContext proxyContext, QueueSelector queueSelector, String str, String str2, int i, long j, long j2, int i2, SubscriptionData subscriptionData, boolean z, PopMessageResultFilter popMessageResultFilter, String str3, long j3) {
        return this.consumerProcessor.popMessage(proxyContext, queueSelector, str, str2, i, j, j2, i2, subscriptionData, z, popMessageResultFilter, str3, j3);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<AckResult> ackMessage(ProxyContext proxyContext, ReceiptHandle receiptHandle, String str, String str2, String str3, long j) {
        return this.consumerProcessor.ackMessage(proxyContext, receiptHandle, str, str2, str3, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<List<BatchAckResult>> batchAckMessage(ProxyContext proxyContext, List<ReceiptHandleMessage> list, String str, String str2, long j) {
        return this.consumerProcessor.batchAckMessage(proxyContext, list, str, str2, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext proxyContext, ReceiptHandle receiptHandle, String str, String str2, String str3, long j, long j2) {
        return this.consumerProcessor.changeInvisibleTime(proxyContext, receiptHandle, str, str2, str3, j, j2);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<PullResult> pullMessage(ProxyContext proxyContext, MessageQueue messageQueue, String str, long j, int i, int i2, long j2, long j3, SubscriptionData subscriptionData, long j4) {
        return this.consumerProcessor.pullMessage(proxyContext, messageQueue, str, j, i, i2, j2, j3, subscriptionData, j4);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<Void> updateConsumerOffset(ProxyContext proxyContext, MessageQueue messageQueue, String str, long j, long j2) {
        return this.consumerProcessor.updateConsumerOffset(proxyContext, messageQueue, str, j, j2);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<Long> queryConsumerOffset(ProxyContext proxyContext, MessageQueue messageQueue, String str, long j) {
        return this.consumerProcessor.queryConsumerOffset(proxyContext, messageQueue, str, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext proxyContext, Set<MessageQueue> set, String str, String str2, long j) {
        return this.consumerProcessor.lockBatchMQ(proxyContext, set, str, str2, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<Void> unlockBatchMQ(ProxyContext proxyContext, Set<MessageQueue> set, String str, String str2, long j) {
        return this.consumerProcessor.unlockBatchMQ(proxyContext, set, str, str2, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<Long> getMaxOffset(ProxyContext proxyContext, MessageQueue messageQueue, long j) {
        return this.consumerProcessor.getMaxOffset(proxyContext, messageQueue, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<Long> getMinOffset(ProxyContext proxyContext, MessageQueue messageQueue, long j) {
        return this.consumerProcessor.getMinOffset(proxyContext, messageQueue, j);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<RemotingCommand> request(ProxyContext proxyContext, String str, RemotingCommand remotingCommand, long j) {
        int opaque = remotingCommand.getOpaque();
        remotingCommand.setOpaque(RemotingCommand.createNewRequestId());
        return this.requestBrokerProcessor.request(proxyContext, str, remotingCommand, j).thenApply(remotingCommand2 -> {
            remotingCommand.setOpaque(opaque);
            return remotingCommand2;
        });
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public CompletableFuture<Void> requestOneway(ProxyContext proxyContext, String str, RemotingCommand remotingCommand, long j) {
        int opaque = remotingCommand.getOpaque();
        remotingCommand.setOpaque(RemotingCommand.createNewRequestId());
        return this.requestBrokerProcessor.requestOneway(proxyContext, str, remotingCommand, j).thenApply(r5 -> {
            remotingCommand.setOpaque(opaque);
            return r5;
        });
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void registerProducer(ProxyContext proxyContext, String str, ClientChannelInfo clientChannelInfo) {
        this.clientProcessor.registerProducer(proxyContext, str, clientChannelInfo);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void unRegisterProducer(ProxyContext proxyContext, String str, ClientChannelInfo clientChannelInfo) {
        this.clientProcessor.unRegisterProducer(proxyContext, str, clientChannelInfo);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public Channel findProducerChannel(ProxyContext proxyContext, String str, String str2) {
        return this.clientProcessor.findProducerChannel(proxyContext, str, str2);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void registerProducerListener(ProducerChangeListener producerChangeListener) {
        this.clientProcessor.registerProducerChangeListener(producerChangeListener);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void registerConsumer(ProxyContext proxyContext, String str, ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, Set<SubscriptionData> set, boolean z) {
        this.clientProcessor.registerConsumer(proxyContext, str, clientChannelInfo, consumeType, messageModel, consumeFromWhere, set, z);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public ClientChannelInfo findConsumerChannel(ProxyContext proxyContext, String str, Channel channel) {
        return this.clientProcessor.findConsumerChannel(proxyContext, str, channel);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void unRegisterConsumer(ProxyContext proxyContext, String str, ClientChannelInfo clientChannelInfo) {
        this.clientProcessor.unRegisterConsumer(proxyContext, str, clientChannelInfo);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void registerConsumerListener(ConsumerIdsChangeListener consumerIdsChangeListener) {
        this.clientProcessor.registerConsumerIdsChangeListener(consumerIdsChangeListener);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void doChannelCloseEvent(String str, Channel channel) {
        this.clientProcessor.doChannelCloseEvent(str, channel);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public ConsumerGroupInfo getConsumerGroupInfo(ProxyContext proxyContext, String str) {
        return this.clientProcessor.getConsumerGroupInfo(proxyContext, str);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void addTransactionSubscription(ProxyContext proxyContext, String str, String str2) {
        this.transactionProcessor.addTransactionSubscription(proxyContext, str, str2);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public ProxyRelayService getProxyRelayService() {
        return this.serviceManager.getProxyRelayService();
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public MetadataService getMetadataService() {
        return this.serviceManager.getMetadataService();
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public void addReceiptHandle(ProxyContext proxyContext, Channel channel, String str, String str2, MessageReceiptHandle messageReceiptHandle) {
        this.receiptHandleProcessor.addReceiptHandle(proxyContext, channel, str, str2, messageReceiptHandle);
    }

    @Override // org.apache.rocketmq.proxy.processor.MessagingProcessor
    public MessageReceiptHandle removeReceiptHandle(ProxyContext proxyContext, Channel channel, String str, String str2, String str3) {
        return this.receiptHandleProcessor.removeReceiptHandle(proxyContext, channel, str, str2, str3);
    }
}
