package org.apache.tubemq.example;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.tubemq.client.common.PeerInfo;
import org.apache.tubemq.client.config.ConsumerConfig;
import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.MessageV2Listener;
import org.apache.tubemq.client.consumer.PushMessageConsumer;
import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tubemq/example/MessageConsumerExample.class */
public final class MessageConsumerExample {
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumerExample.class);
    private static final MsgRecvStats msgRecvStats = new MsgRecvStats();
    private final PushMessageConsumer messageConsumer;
    private final MessageSessionFactory messageSessionFactory;

    /* loaded from: input_file:org/apache/tubemq/example/MessageConsumerExample$DefaultMessageListener.class */
    public static class DefaultMessageListener implements MessageV2Listener {
        private String topic;

        public DefaultMessageListener(String str) {
            this.topic = str;
        }

        public void receiveMessages(PeerInfo peerInfo, List<Message> list) {
            if (list == null || list.isEmpty()) {
                return;
            }
            MessageConsumerExample.msgRecvStats.addMsgCount(this.topic, list.size());
        }

        public void receiveMessages(List<Message> list) {
        }

        public Executor getExecutor() {
            return null;
        }

        public void stop() {
        }
    }

    public MessageConsumerExample(String str, String str2, int i) throws Exception {
        ConsumerConfig consumerConfig = new ConsumerConfig(str, str2);
        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
        if (i > 0) {
            consumerConfig.setPushFetchThreadCnt(i);
        }
        this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
        this.messageConsumer = this.messageSessionFactory.createPushConsumer(consumerConfig);
    }

    public static void main(String[] strArr) {
        final String str = strArr[0];
        String str2 = strArr[1];
        final String str3 = strArr[2];
        int parseInt = Integer.parseInt(strArr[3]);
        int parseInt2 = strArr.length > 5 ? Integer.parseInt(strArr[4]) : -1;
        final HashMap hashMap = new HashMap();
        for (String str4 : str2.split(",")) {
            String[] split = str4.split(":");
            TreeSet treeSet = null;
            if (split.length > 1) {
                String[] split2 = split[1].split(";");
                if (split2.length > 0) {
                    treeSet = new TreeSet(Arrays.asList(split2));
                }
            }
            hashMap.put(split[0], treeSet);
        }
        final int i = parseInt2;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(parseInt2);
        for (int i2 = 0; i2 < parseInt; i2++) {
            newFixedThreadPool.submit(new Runnable() { // from class: org.apache.tubemq.example.MessageConsumerExample.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        new MessageConsumerExample(str, str3, i).subscribe(hashMap);
                    } catch (Exception e) {
                        MessageConsumerExample.logger.error("Create consumer failed!", e);
                    }
                }
            });
        }
        new Thread(msgRecvStats, "Received Statistic Thread").start();
        newFixedThreadPool.shutdown();
        try {
            newFixedThreadPool.awaitTermination(60000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            logger.error("Thread Pool shutdown has been interrupted!");
        }
        msgRecvStats.stopStats();
    }

    public void subscribe(Map<String, TreeSet<String>> map) throws TubeClientException {
        for (Map.Entry<String, TreeSet<String>> entry : map.entrySet()) {
            this.messageConsumer.subscribe(entry.getKey(), entry.getValue(), new DefaultMessageListener(entry.getKey()));
        }
        this.messageConsumer.completeSubscribe();
    }
}
