package org.apache.inlong.tubemq.server.tools.cli;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.server.common.TServerConstants;
import org.apache.inlong.tubemq.server.common.fielddef.CliArgDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/server/tools/cli/CliProducer.class */
public class CliProducer extends CliAbstractBase {
    private static final Logger logger = LoggerFactory.getLogger(CliProducer.class);
    private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
    private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
    private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
    private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
    private static byte[] sentData;
    private final Map<String, TreeSet<String>> topicAndFiltersMap;
    private final List<Tuple2<String, String>> topicSendRounds;
    private final List<MessageSessionFactory> sessionFactoryList;
    private final Map<MessageProducer, MsgSender> producerMap;
    private String masterServers;
    private long msgCount;
    private boolean useRandData;
    private int msgDataSize;
    private String payloadFilePath;
    private String payloadDelim;
    private long rpcTimeoutMs;
    private boolean reuseConn;
    private int clientCount;
    private int sendThreadCnt;
    private long printIntervalMs;
    private boolean syncProduction;
    private boolean withoutDelay;
    private boolean isStarted;
    private ExecutorService sendExecutorService;

    /* loaded from: input_file:org/apache/inlong/tubemq/server/tools/cli/CliProducer$DefaultSendCallback.class */
    private class DefaultSendCallback implements MessageSentCallback {
        private DefaultSendCallback() {
        }

        public void onMessageSent(MessageSentResult messageSentResult) {
            CliProducer.TOTAL_COUNTER.incrementAndGet();
            if (messageSentResult.isSuccess()) {
                CliProducer.SENT_SUCC_COUNTER.incrementAndGet();
            } else {
                CliProducer.SENT_FAIL_COUNTER.incrementAndGet();
            }
        }

        public void onException(Throwable th) {
            CliProducer.TOTAL_COUNTER.incrementAndGet();
            CliProducer.SENT_EXCEPT_COUNTER.incrementAndGet();
            CliProducer.logger.error("Send message error!", th);
        }
    }

    /* loaded from: input_file:org/apache/inlong/tubemq/server/tools/cli/CliProducer$MsgSender.class */
    public class MsgSender implements Runnable {
        private final MessageProducer producer;

        public MsgSender(MessageProducer messageProducer) {
            this.producer = messageProducer;
        }

        @Override // java.lang.Runnable
        public void run() {
            int size = CliProducer.this.topicSendRounds.size();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
            long j = 0;
            while (true) {
                if (CliProducer.this.msgCount >= 0 && j >= CliProducer.this.msgCount) {
                    try {
                        this.producer.shutdown();
                        return;
                    } catch (Throwable th) {
                        CliProducer.logger.error("producer shutdown error: ", th);
                        return;
                    }
                }
                long j2 = j;
                j = j2 + 1;
                int i = (int) (j2 % size);
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    Tuple2 tuple2 = (Tuple2) CliProducer.this.topicSendRounds.get(i);
                    Message message = new Message((String) tuple2.getF0(), CliProducer.sentData);
                    if (tuple2.getF1() != null) {
                        message.putSystemHeader((String) tuple2.getF1(), simpleDateFormat.format(new Date(currentTimeMillis)));
                    }
                    if (CliProducer.this.syncProduction) {
                        MessageSentResult sendMessage = this.producer.sendMessage(message);
                        CliProducer.TOTAL_COUNTER.incrementAndGet();
                        if (sendMessage.isSuccess()) {
                            CliProducer.SENT_SUCC_COUNTER.incrementAndGet();
                        } else {
                            CliProducer.SENT_FAIL_COUNTER.incrementAndGet();
                        }
                    } else {
                        this.producer.sendMessage(message, new DefaultSendCallback());
                    }
                } catch (Throwable th2) {
                    CliProducer.TOTAL_COUNTER.incrementAndGet();
                    CliProducer.SENT_EXCEPT_COUNTER.incrementAndGet();
                    CliProducer.logger.error("sendMessage exception: ", th2);
                }
                if (!CliProducer.this.withoutDelay) {
                    if (j % 5000 == 0) {
                        ThreadUtils.sleep(3000L);
                    } else if (j % 4000 == 0) {
                        ThreadUtils.sleep(2000L);
                    } else if (j % 2000 == 0) {
                        ThreadUtils.sleep(800L);
                    } else if (j % 1000 == 0) {
                        ThreadUtils.sleep(400L);
                    }
                }
            }
        }
    }

    public CliProducer() {
        super("tubemq-producer-test.sh");
        this.topicAndFiltersMap = new HashMap();
        this.topicSendRounds = new ArrayList();
        this.sessionFactoryList = new ArrayList();
        this.producerMap = new HashMap();
        this.msgCount = -2L;
        this.useRandData = true;
        this.msgDataSize = TServerConstants.TOPIC_DSK_UNFLUSHTHRESHOLD_DEF;
        this.payloadFilePath = null;
        this.payloadDelim = null;
        this.rpcTimeoutMs = -2L;
        this.reuseConn = false;
        this.clientCount = 1;
        this.sendThreadCnt = 100;
        this.printIntervalMs = 5000L;
        this.syncProduction = false;
        this.withoutDelay = false;
        this.isStarted = false;
        this.sendExecutorService = null;
        initCommandOptions();
    }

    @Override // org.apache.inlong.tubemq.server.tools.cli.CliAbstractBase
    protected void initCommandOptions() {
        addCommandOption(CliArgDef.MASTERSERVER);
        addCommandOption(CliArgDef.MESSAGES);
        addCommandOption(CliArgDef.MSGDATASIZE);
        addCommandOption(CliArgDef.PRDTOPIC);
        addCommandOption(CliArgDef.RPCTIMEOUT);
        addCommandOption(CliArgDef.CONNREUSE);
        addCommandOption(CliArgDef.CLIENTCOUNT);
        addCommandOption(CliArgDef.OUTPUTINTERVAL);
        addCommandOption(CliArgDef.SYNCPRODUCE);
        addCommandOption(CliArgDef.SENDTHREADS);
        addCommandOption(CliArgDef.WITHOUTDELAY);
    }

    @Override // org.apache.inlong.tubemq.server.tools.cli.CliAbstractBase
    public boolean processParams(String[] strArr) throws Exception {
        CommandLine parse = this.parser.parse(this.options, strArr);
        if (parse == null) {
            throw new ParseException("Parse args failure");
        }
        if (parse.hasOption(CliArgDef.VERSION.longOpt)) {
            version();
        }
        if (parse.hasOption(CliArgDef.HELP.longOpt)) {
            help();
        }
        this.masterServers = parse.getOptionValue(CliArgDef.MASTERSERVER.longOpt);
        if (TStringUtils.isBlank(this.masterServers)) {
            throw new Exception(CliArgDef.MASTERSERVER.longOpt + " is required!");
        }
        String optionValue = parse.getOptionValue(CliArgDef.PRDTOPIC.longOpt);
        if (TStringUtils.isBlank(optionValue)) {
            throw new Exception(CliArgDef.PRDTOPIC.longOpt + " is required!");
        }
        this.topicAndFiltersMap.putAll(MixedUtils.parseTopicParam(optionValue));
        if (this.topicAndFiltersMap.isEmpty()) {
            throw new Exception("Invalid " + CliArgDef.PRDTOPIC.longOpt + " parameter value!");
        }
        String optionValue2 = parse.getOptionValue(CliArgDef.MESSAGES.longOpt);
        if (TStringUtils.isNotBlank(optionValue2)) {
            this.msgCount = Long.parseLong(optionValue2);
        }
        String optionValue3 = parse.getOptionValue(CliArgDef.MSGDATASIZE.longOpt);
        if (TStringUtils.isNotBlank(optionValue3)) {
            this.msgDataSize = Integer.parseInt(optionValue3);
        }
        String optionValue4 = parse.getOptionValue(CliArgDef.CONNREUSE.longOpt);
        if (TStringUtils.isNotBlank(optionValue4)) {
            this.reuseConn = Boolean.parseBoolean(optionValue4);
        }
        String optionValue5 = parse.getOptionValue(CliArgDef.SENDTHREADS.longOpt);
        if (TStringUtils.isNotBlank(optionValue5)) {
            this.sendThreadCnt = MixedUtils.mid(Integer.parseInt(optionValue5), 1, 200);
        }
        String optionValue6 = parse.getOptionValue(CliArgDef.RPCTIMEOUT.longOpt);
        if (TStringUtils.isNotBlank(optionValue6)) {
            this.rpcTimeoutMs = Long.parseLong(optionValue6);
        }
        String optionValue7 = parse.getOptionValue(CliArgDef.CLIENTCOUNT.longOpt);
        if (TStringUtils.isNotBlank(optionValue7)) {
            this.clientCount = Integer.parseInt(optionValue7);
        }
        String optionValue8 = parse.getOptionValue(CliArgDef.OUTPUTINTERVAL.longOpt);
        if (TStringUtils.isNotBlank(optionValue8)) {
            this.printIntervalMs = Long.parseLong(optionValue8);
            if (this.printIntervalMs < 5000) {
                throw new Exception("Invalid " + CliArgDef.OUTPUTINTERVAL.longOpt + " parameter value!");
            }
        }
        if (parse.hasOption(CliArgDef.SYNCPRODUCE.longOpt)) {
            this.syncProduction = true;
        }
        if (!parse.hasOption(CliArgDef.WITHOUTDELAY.longOpt)) {
            return true;
        }
        this.withoutDelay = true;
        return true;
    }

    public void initTask() throws Exception {
        sentData = MixedUtils.buildTestData(this.msgDataSize);
        TubeClientConfig tubeClientConfig = new TubeClientConfig(this.masterServers);
        tubeClientConfig.setRpcTimeoutMs(this.rpcTimeoutMs);
        for (Map.Entry<String, TreeSet<String>> entry : this.topicAndFiltersMap.entrySet()) {
            if (entry.getValue().isEmpty()) {
                this.topicSendRounds.add(new Tuple2<>(entry.getKey()));
            } else {
                Iterator<String> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    this.topicSendRounds.add(new Tuple2<>(entry.getKey(), it.next()));
                }
            }
        }
        this.sendExecutorService = Executors.newFixedThreadPool(this.sendThreadCnt, new ThreadFactory() { // from class: org.apache.inlong.tubemq.server.tools.cli.CliProducer.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "sender_" + CliProducer.this.producerMap.size());
            }
        });
        if (this.reuseConn) {
            MessageSessionFactory tubeSingleSessionFactory = new TubeSingleSessionFactory(tubeClientConfig);
            this.sessionFactoryList.add(tubeSingleSessionFactory);
            for (int i = 0; i < this.clientCount; i++) {
                MessageProducer createProducer = tubeSingleSessionFactory.createProducer();
                createProducer.publish(this.topicAndFiltersMap.keySet());
                this.producerMap.put(createProducer, new MsgSender(createProducer));
                this.sendExecutorService.submit(this.producerMap.get(createProducer));
            }
        } else {
            for (int i2 = 0; i2 < this.clientCount; i2++) {
                MessageSessionFactory tubeMultiSessionFactory = new TubeMultiSessionFactory(tubeClientConfig);
                this.sessionFactoryList.add(tubeMultiSessionFactory);
                MessageProducer createProducer2 = tubeMultiSessionFactory.createProducer();
                createProducer2.publish(this.topicAndFiltersMap.keySet());
                this.producerMap.put(createProducer2, new MsgSender(createProducer2));
                this.sendExecutorService.submit(this.producerMap.get(createProducer2));
            }
        }
        this.isStarted = true;
    }

    public void shutdown() throws Throwable {
        if (this.sendExecutorService != null) {
            this.sendExecutorService.shutdownNow();
        }
        ThreadUtils.sleep(20L);
        Iterator<MessageProducer> it = this.producerMap.keySet().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        Iterator<MessageSessionFactory> it2 = this.sessionFactoryList.iterator();
        while (it2.hasNext()) {
            it2.next().shutdown();
        }
    }

    public static void main(String[] strArr) {
        CliProducer cliProducer = new CliProducer();
        try {
            if (!cliProducer.processParams(strArr)) {
                throw new Exception("Parse parameters failure!");
            }
            cliProducer.initTask();
            ThreadUtils.sleep(1000L);
            while (true) {
                if (cliProducer.msgCount >= 0 && TOTAL_COUNTER.get() >= cliProducer.msgCount * cliProducer.clientCount) {
                    cliProducer.shutdown();
                    System.out.println("Finished, required send count VS sent message count = " + (cliProducer.msgCount * cliProducer.clientCount) + " : " + TOTAL_COUNTER.get() + " (" + SENT_SUCC_COUNTER.get() + ":" + SENT_FAIL_COUNTER.get() + ":" + SENT_EXCEPT_COUNTER.get() + ")");
                    return;
                } else {
                    ThreadUtils.sleep(cliProducer.printIntervalMs);
                    System.out.println("Required send count VS sent message count = " + (cliProducer.msgCount * cliProducer.clientCount) + " : " + TOTAL_COUNTER.get() + " (" + SENT_SUCC_COUNTER.get() + ":" + SENT_FAIL_COUNTER.get() + ":" + SENT_EXCEPT_COUNTER.get() + ")");
                }
            }
        } catch (Throwable th) {
            th.printStackTrace();
            logger.error(th.getMessage());
            cliProducer.help();
        }
    }
}
