package org.apache.rocketmq.test.base;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.clientinterface.MQConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.awaitility.Awaitility;
import org.junit.Assert;

/* loaded from: input_file:org/apache/rocketmq/test/base/BaseConf.class */
public class BaseConf {
    public static final String NAMESRV_ADDR;
    protected static final String CLUSTER_NAME;
    protected static final String BROKER1_NAME;
    protected static final String BROKER2_NAME;
    protected static final String BROKER3_NAME;
    protected static final int BROKER_NUM = 3;
    protected static final int WAIT_TIME = 5;
    protected static final int CONSUME_TIME = 120000;
    protected static final int QUEUE_NUMBERS = 8;
    protected static NamesrvController namesrvController;
    protected static BrokerController brokerController1;
    protected static BrokerController brokerController2;
    protected static BrokerController brokerController3;
    protected static List<BrokerController> brokerControllerList;
    protected static Map<String, BrokerController> brokerControllerMap;
    private static final Logger log = LoggerFactory.getLogger(BaseConf.class);
    protected static List<Object> mqClients = new ArrayList();
    protected static boolean debug = false;

    public BaseConf() {
        waitBrokerRegistered(NAMESRV_ADDR, CLUSTER_NAME, BROKER_NUM);
    }

    public static void waitBrokerRegistered(String str, String str2, int i) {
        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(500L);
        defaultMQAdminExt.setNamesrvAddr(str);
        try {
            defaultMQAdminExt.start();
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
                try {
                    return Boolean.valueOf(defaultMQAdminExt.examineTopicRouteInfo(str2).getBrokerDatas().size() == i);
                } catch (Exception e) {
                    return false;
                }
            });
            Iterator<BrokerController> it = brokerControllerList.iterator();
            while (it.hasNext()) {
                it.next().getBrokerOuterAPI().refreshMetadata();
            }
        } catch (Exception e) {
            log.error("init failed, please check BaseConf", e);
            Assert.fail(e.getMessage());
        }
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        defaultMQAdminExt.getClass();
        commonPool.execute(defaultMQAdminExt::shutdown);
    }

    public boolean awaitDispatchMs(long j) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis <= j) {
            boolean z = true;
            Iterator<BrokerController> it = brokerControllerList.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (it.next().getMessageStore().dispatchBehindBytes() != 0) {
                    z = false;
                    break;
                }
            }
            if (z) {
                return true;
            }
            Thread.sleep(100L);
        }
        return false;
    }

    public static String initTopic() {
        return initTopicWithName(MQRandomUtils.getRandomTopic());
    }

    public static String initTopic(TopicMessageType topicMessageType) {
        return initTopicWithName(MQRandomUtils.getRandomTopic(), topicMessageType);
    }

    public static String initTopicOnSampleTopicBroker(String str) {
        return initTopicOnSampleTopicBroker(MQRandomUtils.getRandomTopic(), str);
    }

    public static String initTopicOnSampleTopicBroker(String str, TopicMessageType topicMessageType) {
        return initTopicOnSampleTopicBroker(MQRandomUtils.getRandomTopic(), str, topicMessageType);
    }

    public static String initTopicWithName(String str) {
        IntegrationTestBase.initTopic(str, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ);
        return str;
    }

    public static String initTopicWithName(String str, TopicMessageType topicMessageType) {
        IntegrationTestBase.initTopic(str, NAMESRV_ADDR, CLUSTER_NAME, topicMessageType);
        return str;
    }

    public static String initTopicOnSampleTopicBroker(String str, String str2) {
        IntegrationTestBase.initTopic(str, NAMESRV_ADDR, str2, CQType.SimpleCQ);
        return str;
    }

    public static String initTopicOnSampleTopicBroker(String str, String str2, TopicMessageType topicMessageType) {
        IntegrationTestBase.initTopic(str, NAMESRV_ADDR, str2, topicMessageType);
        return str;
    }

    public static String initConsumerGroup() {
        return initConsumerGroup(MQRandomUtils.getRandomConsumerGroup());
    }

    public static String initConsumerGroup(String str) {
        MQAdminTestUtils.createSub(NAMESRV_ADDR, CLUSTER_NAME, str);
        return str;
    }

    public static DefaultMQAdminExt getAdmin(String str) {
        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(3000L);
        defaultMQAdminExt.setNamesrvAddr(str);
        defaultMQAdminExt.setPollNameServerInterval(100);
        mqClients.add(defaultMQAdminExt);
        return defaultMQAdminExt;
    }

    public static RMQNormalProducer getProducer(String str, String str2) {
        return getProducer(str, str2, false);
    }

    public static RMQNormalProducer getProducer(String str, String str2, boolean z) {
        RMQNormalProducer rMQNormalProducer = new RMQNormalProducer(str, str2, z);
        if (debug) {
            rMQNormalProducer.setDebug();
        }
        mqClients.add(rMQNormalProducer);
        return rMQNormalProducer;
    }

    public static RMQTransactionalProducer getTransactionalProducer(String str, String str2, TransactionListener transactionListener) {
        RMQTransactionalProducer rMQTransactionalProducer = new RMQTransactionalProducer(str, str2, false, transactionListener);
        if (debug) {
            rMQTransactionalProducer.setDebug();
        }
        mqClients.add(rMQTransactionalProducer);
        return rMQTransactionalProducer;
    }

    public static RMQNormalProducer getProducer(String str, String str2, String str3, String str4) {
        RMQNormalProducer rMQNormalProducer = new RMQNormalProducer(str, str2, str3, str4);
        if (debug) {
            rMQNormalProducer.setDebug();
        }
        mqClients.add(rMQNormalProducer);
        return rMQNormalProducer;
    }

    public static RMQAsyncSendProducer getAsyncProducer(String str, String str2) {
        RMQAsyncSendProducer rMQAsyncSendProducer = new RMQAsyncSendProducer(str, str2);
        if (debug) {
            rMQAsyncSendProducer.setDebug();
        }
        mqClients.add(rMQAsyncSendProducer);
        return rMQAsyncSendProducer;
    }

    public static RMQNormalConsumer getConsumer(String str, String str2, String str3, AbstractListener abstractListener) {
        return getConsumer(str, str2, str3, abstractListener, false);
    }

    public static RMQNormalConsumer getConsumer(String str, String str2, String str3, AbstractListener abstractListener, boolean z) {
        return getConsumer(str, initConsumerGroup(), str2, str3, abstractListener, z);
    }

    public static RMQNormalConsumer getConsumer(String str, String str2, String str3, String str4, AbstractListener abstractListener) {
        return getConsumer(str, str2, str3, str4, abstractListener, false);
    }

    public static RMQNormalConsumer getConsumer(String str, String str2, String str3, String str4, AbstractListener abstractListener, boolean z) {
        RMQNormalConsumer rMQNormalConsumer = ConsumerFactory.getRMQNormalConsumer(str, str2, str3, str4, abstractListener, z);
        if (debug) {
            rMQNormalConsumer.setDebug();
        }
        mqClients.add(rMQNormalConsumer);
        log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", str2, str3, str4));
        return rMQNormalConsumer;
    }

    public static void shutdown() {
        ImmutableList copyOf = ImmutableList.copyOf(mqClients);
        mqClients.clear();
        shutdown(copyOf);
    }

    public static Set<String> getBrokers() {
        HashSet hashSet = new HashSet();
        hashSet.add(BROKER1_NAME);
        hashSet.add(BROKER2_NAME);
        hashSet.add(BROKER3_NAME);
        return hashSet;
    }

    public static void shutdown(List<Object> list) {
        list.forEach(obj -> {
            ForkJoinPool.commonPool().execute(() -> {
                if (obj instanceof AbstractMQProducer) {
                    ((AbstractMQProducer) obj).shutdown();
                    return;
                }
                if (obj instanceof AbstractMQConsumer) {
                    ((AbstractMQConsumer) obj).shutdown();
                    return;
                }
                if (obj instanceof MQAdminExt) {
                    ((MQAdminExt) obj).shutdown();
                    return;
                }
                if (obj instanceof MQProducer) {
                    ((MQProducer) obj).shutdown();
                    return;
                }
                if (obj instanceof MQPullConsumer) {
                    ((MQPullConsumer) obj).shutdown();
                } else if (obj instanceof MQPushConsumer) {
                    ((MQPushConsumer) obj).shutdown();
                } else if (obj instanceof MQConsumer) {
                    ((MQConsumer) obj).shutdown();
                }
            });
        });
    }

    static {
        System.setProperty("rocketmq.remoting.version", Integer.toString(MQVersion.CURRENT_VERSION));
        namesrvController = IntegrationTestBase.createAndStartNamesrv();
        NAMESRV_ADDR = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
        log.debug("Name server started, listening: {}", NAMESRV_ADDR);
        brokerController1 = IntegrationTestBase.createAndStartBroker(NAMESRV_ADDR);
        log.debug("Broker {} started, listening: {}", brokerController1.getBrokerConfig().getBrokerName(), Integer.valueOf(brokerController1.getBrokerConfig().getListenPort()));
        brokerController2 = IntegrationTestBase.createAndStartBroker(NAMESRV_ADDR);
        log.debug("Broker {} started, listening: {}", brokerController2.getBrokerConfig().getBrokerName(), Integer.valueOf(brokerController2.getBrokerConfig().getListenPort()));
        brokerController3 = IntegrationTestBase.createAndStartBroker(NAMESRV_ADDR);
        log.debug("Broker {} started, listening: {}", brokerController2.getBrokerConfig().getBrokerName(), Integer.valueOf(brokerController2.getBrokerConfig().getListenPort()));
        CLUSTER_NAME = brokerController1.getBrokerConfig().getBrokerClusterName();
        BROKER1_NAME = brokerController1.getBrokerConfig().getBrokerName();
        BROKER2_NAME = brokerController2.getBrokerConfig().getBrokerName();
        BROKER3_NAME = brokerController3.getBrokerConfig().getBrokerName();
        brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3);
        brokerControllerMap = (Map) brokerControllerList.stream().collect(Collectors.toMap(brokerController -> {
            return brokerController.getBrokerConfig().getBrokerName();
        }, Function.identity()));
        IntegrationTestBase.initMQAdmin(NAMESRV_ADDR);
    }
}
