package org.apache.tubemq.example;

import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.client.config.TubeClientConfig;
import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.tubemq.client.producer.MessageProducer;
import org.apache.tubemq.client.producer.MessageSentCallback;
import org.apache.tubemq.client.producer.MessageSentResult;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tubemq/example/MAMessageProducerExample.class */
public class MAMessageProducerExample {
    private static final int MAX_PRODUCER_NUM = 100;
    private static final int SESSION_FACTORY_NUM = 10;
    private static Map<String, TreeSet<String>> topicAndFiltersMap;
    private static int msgCount;
    private static int clientCount;
    private static byte[] sendData;
    private final Map<MessageProducer, Sender> producerMap = new HashMap();
    private final List<MessageSessionFactory> sessionFactoryList = new ArrayList();
    private final ExecutorService sendExecutorService = Executors.newFixedThreadPool(MAX_PRODUCER_NUM, new ThreadFactory() { // from class: org.apache.tubemq.example.MAMessageProducerExample.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "sender_" + MAMessageProducerExample.this.producerMap.size());
        }
    });
    private final AtomicInteger producerIndex = new AtomicInteger(0);
    private static final Logger logger = LoggerFactory.getLogger(MAMessageProducerExample.class);
    private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
    private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
    private static final AtomicLong SENT_FAIL_COUNTER = new AtomicLong(0);
    private static final AtomicLong SENT_EXCEPT_COUNTER = new AtomicLong(0);
    private static final List<MessageProducer> PRODUCER_LIST = new ArrayList();
    private static List<Tuple2<String, String>> topicSendRounds = new ArrayList();
    private static AtomicLong filterMsgCount = new AtomicLong(0);

    /* loaded from: input_file:org/apache/tubemq/example/MAMessageProducerExample$DefaultSendCallback.class */
    private class DefaultSendCallback implements MessageSentCallback {
        private DefaultSendCallback() {
        }

        public void onMessageSent(MessageSentResult messageSentResult) {
            MAMessageProducerExample.TOTAL_COUNTER.incrementAndGet();
            if (messageSentResult.isSuccess()) {
                MAMessageProducerExample.SENT_SUCC_COUNTER.incrementAndGet();
            } else {
                MAMessageProducerExample.SENT_FAIL_COUNTER.incrementAndGet();
            }
        }

        public void onException(Throwable th) {
            MAMessageProducerExample.TOTAL_COUNTER.incrementAndGet();
            MAMessageProducerExample.SENT_EXCEPT_COUNTER.incrementAndGet();
            MAMessageProducerExample.logger.error("Send message error!", th);
        }
    }

    /* loaded from: input_file:org/apache/tubemq/example/MAMessageProducerExample$Sender.class */
    public class Sender implements Runnable {
        private MessageProducer producer;

        public Sender(MessageProducer messageProducer) {
            this.producer = messageProducer;
        }

        @Override // java.lang.Runnable
        public void run() {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
            try {
                this.producer.publish(MAMessageProducerExample.topicAndFiltersMap.keySet());
            } catch (Throwable th) {
                MAMessageProducerExample.logger.error("publish exception: ", th);
            }
            long j = 0;
            int size = MAMessageProducerExample.topicSendRounds.size();
            while (true) {
                if (MAMessageProducerExample.msgCount >= 0 && j >= MAMessageProducerExample.msgCount) {
                    try {
                        this.producer.shutdown();
                        return;
                    } catch (Throwable th2) {
                        MAMessageProducerExample.logger.error("producer shutdown error: ", th2);
                        return;
                    }
                }
                long currentTimeMillis = System.currentTimeMillis();
                long j2 = j;
                j = j2 + 1;
                Tuple2 tuple2 = (Tuple2) MAMessageProducerExample.topicSendRounds.get((int) (j2 % size));
                Message message = new Message((String) tuple2.getF0(), MAMessageProducerExample.sendData);
                message.setAttrKeyVal("index", String.valueOf(j));
                message.setAttrKeyVal("dataTime", String.valueOf(currentTimeMillis));
                if (tuple2.getF1() != null) {
                    MAMessageProducerExample.filterMsgCount.incrementAndGet();
                    message.putSystemHeader((String) tuple2.getF1(), simpleDateFormat.format(new Date(currentTimeMillis)));
                }
                try {
                    this.producer.sendMessage(message, new DefaultSendCallback());
                } catch (Throwable th3) {
                    MAMessageProducerExample.TOTAL_COUNTER.incrementAndGet();
                    MAMessageProducerExample.SENT_EXCEPT_COUNTER.incrementAndGet();
                    MAMessageProducerExample.logger.error("sendMessage exception: ", th3);
                }
                MAMessageProducerExample.TOTAL_COUNTER.incrementAndGet();
                if (j % 5000 == 0) {
                    ThreadUtils.sleep(3000L);
                } else if (j % 4000 == 0) {
                    ThreadUtils.sleep(2000L);
                } else if (j % 2000 == 0) {
                    ThreadUtils.sleep(800L);
                } else if (j % 1000 == 0) {
                    ThreadUtils.sleep(400L);
                }
            }
        }
    }

    public MAMessageProducerExample(String str) throws Exception {
        TubeClientConfig tubeClientConfig = new TubeClientConfig(str);
        for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
            this.sessionFactoryList.add(new TubeMultiSessionFactory(tubeClientConfig));
        }
    }

    public static void main(String[] strArr) {
        String str = strArr[0];
        String str2 = strArr[1];
        msgCount = Integer.parseInt(strArr[2]);
        clientCount = Math.min(strArr.length > 4 ? Integer.parseInt(strArr[3]) : SESSION_FACTORY_NUM, MAX_PRODUCER_NUM);
        topicAndFiltersMap = MixedUtils.parseTopicParam(str2);
        for (Map.Entry<String, TreeSet<String>> entry : topicAndFiltersMap.entrySet()) {
            if (entry.getValue().isEmpty()) {
                topicSendRounds.add(new Tuple2<>(entry.getKey()));
            } else {
                Iterator<String> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    topicSendRounds.add(new Tuple2<>(entry.getKey(), it.next()));
                }
            }
        }
        byte[] bytesUtf8 = StringUtils.getBytesUtf8("This is a test message from multi-session factory.");
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        while (allocate.hasRemaining()) {
            allocate.put(bytesUtf8, allocate.arrayOffset(), Math.min(allocate.remaining(), bytesUtf8.length));
        }
        allocate.flip();
        sendData = allocate.array();
        logger.info("MAMessageProducerExample.main started...");
        try {
            MAMessageProducerExample mAMessageProducerExample = new MAMessageProducerExample(str);
            mAMessageProducerExample.startService();
            while (TOTAL_COUNTER.get() < msgCount * clientCount) {
                logger.info("Sending, total messages is {}, filter messages is {}", Long.valueOf(SENT_SUCC_COUNTER.get()), Long.valueOf(filterMsgCount.get()));
                Thread.sleep(5000L);
            }
            logger.info("Finished, total messages is {}, filter messages is {}", Long.valueOf(SENT_SUCC_COUNTER.get()), Long.valueOf(filterMsgCount.get()));
            mAMessageProducerExample.producerMap.clear();
            mAMessageProducerExample.shutdown();
        } catch (TubeClientException e) {
            logger.error("TubeClientException: ", e);
        } catch (Throwable th) {
            logger.error("Throwable: ", th);
        }
    }

    public MessageProducer createProducer() throws TubeClientException {
        return this.sessionFactoryList.get(this.producerIndex.incrementAndGet() % SESSION_FACTORY_NUM).createProducer();
    }

    private void startService() throws TubeClientException {
        for (int i = 0; i < clientCount; i++) {
            PRODUCER_LIST.add(createProducer());
        }
        for (MessageProducer messageProducer : PRODUCER_LIST) {
            if (messageProducer != null) {
                this.producerMap.put(messageProducer, new Sender(messageProducer));
                this.sendExecutorService.submit(this.producerMap.get(messageProducer));
            }
        }
    }

    public void shutdown() throws Throwable {
        this.sendExecutorService.shutdownNow();
        for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
            this.sessionFactoryList.get(i).shutdown();
        }
    }
}
