package org.apache.tubemq.server.tools.cli;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.tubemq.client.common.PeerInfo;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.MessageConsumer;
import org.apache.tubemq.client.consumer.MessageV2Listener;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.TErrCodeConstants;
import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.apache.tubemq.server.common.fielddef.CliArgDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tubemq/server/tools/cli/CliConsumer.class */
public class CliConsumer extends CliAbstractBase {
    private static final Logger logger = LoggerFactory.getLogger(CliConsumer.class);
    private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
    private final Map<String, TreeSet<String>> topicAndFiltersMap;
    private final List<MessageSessionFactory> sessionFactoryList;
    private final Map<MessageConsumer, TupleValue> consumerMap;
    private String masterServers;
    private String groupName;
    private ConsumePosition consumePos;
    private long msgCount;
    private long rpcTimeoutMs;
    private boolean reuseConn;
    private int clientCount;
    private int fetchThreadCnt;
    private long printIntervalMs;
    private boolean isPushConsume;
    private boolean isStarted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tubemq/server/tools/cli/CliConsumer$DefaultMessageListener.class */
    public static class DefaultMessageListener implements MessageV2Listener {
        public void receiveMessages(PeerInfo peerInfo, List<Message> list) {
            if (list == null || list.isEmpty()) {
                return;
            }
            CliConsumer.TOTAL_COUNTER.addAndGet(list.size());
        }

        public void receiveMessages(List<Message> list) {
        }

        public Executor getExecutor() {
            return null;
        }

        public void stop() {
        }
    }

    /* loaded from: input_file:org/apache/tubemq/server/tools/cli/CliConsumer$FetchRequestRunner.class */
    private static class FetchRequestRunner implements Runnable {
        private final PullMessageConsumer messageConsumer;
        private final long msgConsumeCnt;

        FetchRequestRunner(PullMessageConsumer pullMessageConsumer, long j) {
            this.messageConsumer = pullMessageConsumer;
            this.msgConsumeCnt = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    ConsumerResult message = this.messageConsumer.getMessage();
                    if (!message.isSuccess()) {
                        if (!TErrCodeConstants.IGNORE_ERROR_SET.contains(Integer.valueOf(message.getErrCode()))) {
                            CliConsumer.logger.info("Receive messages errorCode is {}, Error message is {}", Integer.valueOf(message.getErrCode()), message.getErrMsg());
                            if (this.messageConsumer.isShutdown()) {
                                break;
                            }
                        }
                    } else {
                        List messageList = message.getMessageList();
                        if (messageList != null && !messageList.isEmpty()) {
                            CliConsumer.TOTAL_COUNTER.addAndGet(messageList.size());
                        }
                        this.messageConsumer.confirmConsume(message.getConfirmContext(), true);
                    }
                    if (this.msgConsumeCnt >= 0 && CliConsumer.TOTAL_COUNTER.get() >= this.msgConsumeCnt) {
                        break;
                    }
                } catch (TubeClientException e) {
                    CliConsumer.logger.error("Create consumer failed!", e);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tubemq/server/tools/cli/CliConsumer$TupleValue.class */
    public static class TupleValue {
        public Thread[] fetchRunners;

        public TupleValue(PullMessageConsumer pullMessageConsumer, long j, int i) {
            this.fetchRunners = null;
            this.fetchRunners = new Thread[i];
            for (int i2 = 0; i2 < this.fetchRunners.length; i2++) {
                this.fetchRunners[i2] = new Thread(new FetchRequestRunner(pullMessageConsumer, j));
                this.fetchRunners[i2].setName("_fetch_runner_" + i2);
            }
            for (Thread thread : this.fetchRunners) {
                thread.start();
            }
        }
    }

    public CliConsumer() {
        super("tubemq-consumer-test.sh");
        this.topicAndFiltersMap = new HashMap();
        this.sessionFactoryList = new ArrayList();
        this.consumerMap = new HashMap();
        this.groupName = "test_consume";
        this.consumePos = ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
        this.msgCount = -2L;
        this.rpcTimeoutMs = -2L;
        this.reuseConn = false;
        this.clientCount = 1;
        this.fetchThreadCnt = Runtime.getRuntime().availableProcessors();
        this.printIntervalMs = 5000L;
        this.isPushConsume = false;
        this.isStarted = false;
        initCommandOptions();
    }

    @Override // org.apache.tubemq.server.tools.cli.CliAbstractBase
    protected void initCommandOptions() {
        addCommandOption(CliArgDef.MASTERSERVER);
        addCommandOption(CliArgDef.MESSAGES);
        addCommandOption(CliArgDef.CNSTOPIC);
        addCommandOption(CliArgDef.RPCTIMEOUT);
        addCommandOption(CliArgDef.GROUP);
        addCommandOption(CliArgDef.CONNREUSE);
        addCommandOption(CliArgDef.PUSHCONSUME);
        addCommandOption(CliArgDef.CONSUMEPOS);
        addCommandOption(CliArgDef.FETCHTHREADS);
        addCommandOption(CliArgDef.CLIENTCOUNT);
        addCommandOption(CliArgDef.OUTPUTINTERVAL);
    }

    @Override // org.apache.tubemq.server.tools.cli.CliAbstractBase
    public boolean processParams(String[] strArr) throws Exception {
        CommandLine parse = this.parser.parse(this.options, strArr);
        if (parse == null) {
            throw new ParseException("Parse args failure");
        }
        if (parse.hasOption(CliArgDef.VERSION.longOpt)) {
            version();
        }
        if (parse.hasOption(CliArgDef.HELP.longOpt)) {
            help();
        }
        this.masterServers = parse.getOptionValue(CliArgDef.MASTERSERVER.longOpt);
        if (TStringUtils.isBlank(this.masterServers)) {
            throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!");
        }
        String optionValue = parse.getOptionValue(CliArgDef.CNSTOPIC.longOpt);
        if (TStringUtils.isBlank(optionValue)) {
            throw new Exception(CliArgDef.CNSTOPIC.longOpt + " is required!");
        }
        this.topicAndFiltersMap.putAll(MixedUtils.parseTopicParam(optionValue));
        if (this.topicAndFiltersMap.isEmpty()) {
            throw new Exception("Invalid " + CliArgDef.CNSTOPIC.longOpt + " parameter value!");
        }
        String optionValue2 = parse.getOptionValue(CliArgDef.MESSAGES.longOpt);
        if (TStringUtils.isNotBlank(optionValue2)) {
            this.msgCount = Long.parseLong(optionValue2);
        }
        if (TStringUtils.isNotBlank(parse.getOptionValue(CliArgDef.GROUP.longOpt))) {
            this.groupName = parse.getOptionValue(CliArgDef.GROUP.longOpt);
        }
        String optionValue3 = parse.getOptionValue(CliArgDef.CONNREUSE.longOpt);
        if (TStringUtils.isNotBlank(optionValue3)) {
            this.reuseConn = Boolean.parseBoolean(optionValue3);
        }
        String optionValue4 = parse.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt);
        if (TStringUtils.isNotBlank(optionValue4)) {
            this.rpcTimeoutMs = Long.parseLong(optionValue4);
        }
        String optionValue5 = parse.getOptionValue(CliArgDef.CLIENTCOUNT.longOpt);
        if (TStringUtils.isNotBlank(optionValue5)) {
            this.clientCount = Integer.parseInt(optionValue5);
        }
        String optionValue6 = parse.getOptionValue(CliArgDef.OUTPUTINTERVAL.longOpt);
        if (TStringUtils.isNotBlank(optionValue6)) {
            this.printIntervalMs = Long.parseLong(optionValue6);
            if (this.printIntervalMs < 5000) {
                throw new Exception("Invalid " + CliArgDef.OUTPUTINTERVAL.longOpt + " parameter value!");
            }
        }
        String optionValue7 = parse.getOptionValue(CliArgDef.CONSUMEPOS.longOpt);
        if (TStringUtils.isNotBlank(optionValue7)) {
            int parseInt = Integer.parseInt(optionValue7);
            if (parseInt > 0) {
                this.consumePos = ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS;
            } else if (parseInt < 0) {
                this.consumePos = ConsumePosition.CONSUMER_FROM_FIRST_OFFSET;
            } else {
                this.consumePos = ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
            }
        }
        if (parse.hasOption(CliArgDef.PUSHCONSUME.longOpt)) {
            this.isPushConsume = true;
        }
        String optionValue8 = parse.getOptionValue(CliArgDef.FETCHTHREADS.longOpt);
        if (!TStringUtils.isNotBlank(optionValue8)) {
            return true;
        }
        this.fetchThreadCnt = MixedUtils.mid(Integer.parseInt(optionValue8), 1, 100);
        return true;
    }

    public void initTask() throws Exception {
        ConsumerConfig consumerConfig = new ConsumerConfig(this.masterServers, this.groupName);
        consumerConfig.setRpcTimeoutMs(this.rpcTimeoutMs);
        consumerConfig.setPushFetchThreadCnt(this.fetchThreadCnt);
        consumerConfig.setConsumePosition(this.consumePos);
        if (this.isPushConsume) {
            DefaultMessageListener defaultMessageListener = new DefaultMessageListener();
            if (this.reuseConn) {
                MessageSessionFactory tubeSingleSessionFactory = new TubeSingleSessionFactory(consumerConfig);
                this.sessionFactoryList.add(tubeSingleSessionFactory);
                for (int i = 0; i < this.clientCount; i++) {
                    MessageConsumer createPushConsumer = tubeSingleSessionFactory.createPushConsumer(consumerConfig);
                    for (Map.Entry<String, TreeSet<String>> entry : this.topicAndFiltersMap.entrySet()) {
                        createPushConsumer.subscribe(entry.getKey(), entry.getValue(), defaultMessageListener);
                    }
                    createPushConsumer.completeSubscribe();
                    this.consumerMap.put(createPushConsumer, null);
                }
            } else {
                for (int i2 = 0; i2 < this.clientCount; i2++) {
                    MessageSessionFactory tubeMultiSessionFactory = new TubeMultiSessionFactory(consumerConfig);
                    this.sessionFactoryList.add(tubeMultiSessionFactory);
                    MessageConsumer createPushConsumer2 = tubeMultiSessionFactory.createPushConsumer(consumerConfig);
                    for (Map.Entry<String, TreeSet<String>> entry2 : this.topicAndFiltersMap.entrySet()) {
                        createPushConsumer2.subscribe(entry2.getKey(), entry2.getValue(), defaultMessageListener);
                    }
                    createPushConsumer2.completeSubscribe();
                    this.consumerMap.put(createPushConsumer2, null);
                }
            }
        } else if (this.reuseConn) {
            MessageSessionFactory tubeSingleSessionFactory2 = new TubeSingleSessionFactory(consumerConfig);
            this.sessionFactoryList.add(tubeSingleSessionFactory2);
            for (int i3 = 0; i3 < this.clientCount; i3++) {
                MessageConsumer createPullConsumer = tubeSingleSessionFactory2.createPullConsumer(consumerConfig);
                for (Map.Entry<String, TreeSet<String>> entry3 : this.topicAndFiltersMap.entrySet()) {
                    createPullConsumer.subscribe(entry3.getKey(), entry3.getValue());
                }
                createPullConsumer.completeSubscribe();
                this.consumerMap.put(createPullConsumer, new TupleValue(createPullConsumer, this.msgCount, this.fetchThreadCnt));
            }
        } else {
            for (int i4 = 0; i4 < this.clientCount; i4++) {
                MessageSessionFactory tubeMultiSessionFactory2 = new TubeMultiSessionFactory(consumerConfig);
                this.sessionFactoryList.add(tubeMultiSessionFactory2);
                MessageConsumer createPullConsumer2 = tubeMultiSessionFactory2.createPullConsumer(consumerConfig);
                for (Map.Entry<String, TreeSet<String>> entry4 : this.topicAndFiltersMap.entrySet()) {
                    createPullConsumer2.subscribe(entry4.getKey(), entry4.getValue());
                }
                createPullConsumer2.completeSubscribe();
                this.consumerMap.put(createPullConsumer2, new TupleValue(createPullConsumer2, this.msgCount, this.fetchThreadCnt));
            }
        }
        this.isStarted = true;
    }

    public void shutdown() throws Throwable {
        ThreadUtils.sleep(20L);
        Iterator<MessageConsumer> it = this.consumerMap.keySet().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        Iterator<MessageSessionFactory> it2 = this.sessionFactoryList.iterator();
        while (it2.hasNext()) {
            it2.next().shutdown();
        }
    }

    public static void main(String[] strArr) {
        CliConsumer cliConsumer = new CliConsumer();
        try {
            if (!cliConsumer.processParams(strArr)) {
                throw new Exception("Parse parameters failure!");
            }
            cliConsumer.initTask();
            ThreadUtils.sleep(1000L);
            while (true) {
                if (cliConsumer.msgCount >= 0 && TOTAL_COUNTER.get() >= cliConsumer.msgCount * cliConsumer.clientCount) {
                    cliConsumer.shutdown();
                    System.out.println("Finished, received count VS received message count = " + (cliConsumer.msgCount * cliConsumer.clientCount) + " : " + TOTAL_COUNTER.get());
                    return;
                } else {
                    ThreadUtils.sleep(cliConsumer.printIntervalMs);
                    System.out.println("Required received count VS received message count = " + (cliConsumer.msgCount * cliConsumer.clientCount) + " : " + TOTAL_COUNTER.get());
                }
            }
        } catch (Throwable th) {
            th.printStackTrace();
            logger.error(th.getMessage());
            cliConsumer.help();
        }
    }
}
