/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.base;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.test.base.IntegrationTestBase;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.clientinterface.MQConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.awaitility.Awaitility;
import org.junit.Assert;

public class BaseConf {
    private static final Logger log = LoggerFactory.getLogger(BaseConf.class);
    public static final String NAMESRV_ADDR;
    protected static final String CLUSTER_NAME;
    protected static final String BROKER1_NAME;
    protected static final String BROKER2_NAME;
    protected static final String BROKER3_NAME;
    protected static final int BROKER_NUM = 3;
    protected static final int WAIT_TIME = 5;
    protected static final int CONSUME_TIME = 120000;
    protected static final int QUEUE_NUMBERS = 8;
    protected static NamesrvController namesrvController;
    protected static BrokerController brokerController1;
    protected static BrokerController brokerController2;
    protected static BrokerController brokerController3;
    protected static List<BrokerController> brokerControllerList;
    protected static Map<String, BrokerController> brokerControllerMap;
    protected static List<Object> mqClients;
    protected static boolean debug;

    public BaseConf() {
        BaseConf.waitBrokerRegistered(NAMESRV_ADDR, CLUSTER_NAME, 3);
    }

    public static void waitBrokerRegistered(String nsAddr, String clusterName, int expectedBrokerNum) {
        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500L);
        mqAdminExt.setNamesrvAddr(nsAddr);
        try {
            mqAdminExt.start();
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
                List brokerDatas;
                try {
                    brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas();
                }
                catch (Exception e) {
                    return false;
                }
                return brokerDatas.size() == expectedBrokerNum;
            });
            for (BrokerController brokerController : brokerControllerList) {
                brokerController.getBrokerOuterAPI().refreshMetadata();
            }
        }
        catch (Exception e) {
            log.error("init failed, please check BaseConf", (Throwable)e);
            Assert.fail((String)e.getMessage());
        }
        ForkJoinPool.commonPool().execute(() -> ((DefaultMQAdminExt)mqAdminExt).shutdown());
    }

    public boolean awaitDispatchMs(long timeMs) throws Exception {
        long start = System.currentTimeMillis();
        while (System.currentTimeMillis() - start <= timeMs) {
            boolean allOk = true;
            for (BrokerController brokerController : brokerControllerList) {
                if (brokerController.getMessageStore().dispatchBehindBytes() == 0L) continue;
                allOk = false;
                break;
            }
            if (allOk) {
                return true;
            }
            Thread.sleep(100L);
        }
        return false;
    }

    public static String initTopic() {
        String topic = MQRandomUtils.getRandomTopic();
        return BaseConf.initTopicWithName(topic);
    }

    public static String initTopic(TopicMessageType topicMessageType) {
        String topic = MQRandomUtils.getRandomTopic();
        return BaseConf.initTopicWithName(topic, topicMessageType);
    }

    public static String initTopicOnSampleTopicBroker(String sampleTopic) {
        String topic = MQRandomUtils.getRandomTopic();
        return BaseConf.initTopicOnSampleTopicBroker(topic, sampleTopic);
    }

    public static String initTopicOnSampleTopicBroker(String sampleTopic, TopicMessageType topicMessageType) {
        String topic = MQRandomUtils.getRandomTopic();
        return BaseConf.initTopicOnSampleTopicBroker(topic, sampleTopic, topicMessageType);
    }

    public static String initTopicWithName(String topicName) {
        IntegrationTestBase.initTopic(topicName, NAMESRV_ADDR, CLUSTER_NAME, CQType.SimpleCQ);
        return topicName;
    }

    public static String initTopicWithName(String topicName, TopicMessageType topicMessageType) {
        IntegrationTestBase.initTopic(topicName, NAMESRV_ADDR, CLUSTER_NAME, topicMessageType);
        return topicName;
    }

    public static String initTopicOnSampleTopicBroker(String topicName, String sampleTopic) {
        IntegrationTestBase.initTopic(topicName, NAMESRV_ADDR, sampleTopic, CQType.SimpleCQ);
        return topicName;
    }

    public static String initTopicOnSampleTopicBroker(String topicName, String sampleTopic, TopicMessageType topicMessageType) {
        IntegrationTestBase.initTopic(topicName, NAMESRV_ADDR, sampleTopic, topicMessageType);
        return topicName;
    }

    public static String initConsumerGroup() {
        String group = MQRandomUtils.getRandomConsumerGroup();
        return BaseConf.initConsumerGroup(group);
    }

    public static String initConsumerGroup(String group) {
        MQAdminTestUtils.createSub((String)NAMESRV_ADDR, (String)CLUSTER_NAME, (String)group);
        return group;
    }

    public static DefaultMQAdminExt getAdmin(String nsAddr) {
        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(3000L);
        mqAdminExt.setNamesrvAddr(nsAddr);
        mqAdminExt.setPollNameServerInterval(100);
        mqClients.add(mqAdminExt);
        return mqAdminExt;
    }

    public static RMQNormalProducer getProducer(String nsAddr, String topic) {
        return BaseConf.getProducer(nsAddr, topic, false);
    }

    public static RMQNormalProducer getProducer(String nsAddr, String topic, boolean useTLS) {
        RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, useTLS);
        if (debug) {
            producer.setDebug();
        }
        mqClients.add(producer);
        return producer;
    }

    public static RMQTransactionalProducer getTransactionalProducer(String nsAddr, String topic, TransactionListener transactionListener) {
        RMQTransactionalProducer producer = new RMQTransactionalProducer(nsAddr, topic, false, transactionListener);
        if (debug) {
            producer.setDebug();
        }
        mqClients.add(producer);
        return producer;
    }

    public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup, String instanceName) {
        RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup, instanceName);
        if (debug) {
            producer.setDebug();
        }
        mqClients.add(producer);
        return producer;
    }

    public static RMQAsyncSendProducer getAsyncProducer(String nsAddr, String topic) {
        RMQAsyncSendProducer producer = new RMQAsyncSendProducer(nsAddr, topic);
        if (debug) {
            producer.setDebug();
        }
        mqClients.add(producer);
        return producer;
    }

    public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression, AbstractListener listener) {
        return BaseConf.getConsumer(nsAddr, topic, subExpression, listener, false);
    }

    public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression, AbstractListener listener, boolean useTLS) {
        String consumerGroup = BaseConf.initConsumerGroup();
        return BaseConf.getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS);
    }

    public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic, String subExpression, AbstractListener listener) {
        return BaseConf.getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
    }

    public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic, String subExpression, AbstractListener listener, boolean useTLS) {
        RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer((String)nsAddr, (String)consumerGroup, (String)topic, (String)subExpression, (AbstractListener)listener, (boolean)useTLS);
        if (debug) {
            consumer.setDebug();
        }
        mqClients.add(consumer);
        log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup, topic, subExpression));
        return consumer;
    }

    public static void shutdown() {
        ImmutableList mqClients = ImmutableList.copyOf(BaseConf.mqClients);
        BaseConf.mqClients.clear();
        BaseConf.shutdown((List<Object>)mqClients);
    }

    public static Set<String> getBrokers() {
        HashSet<String> brokers = new HashSet<String>();
        brokers.add(BROKER1_NAME);
        brokers.add(BROKER2_NAME);
        brokers.add(BROKER3_NAME);
        return brokers;
    }

    public static void shutdown(List<Object> mqClients) {
        mqClients.forEach(mqClient -> ForkJoinPool.commonPool().execute(() -> {
            if (mqClient instanceof AbstractMQProducer) {
                ((AbstractMQProducer)mqClient).shutdown();
            } else if (mqClient instanceof AbstractMQConsumer) {
                ((AbstractMQConsumer)mqClient).shutdown();
            } else if (mqClient instanceof MQAdminExt) {
                ((MQAdminExt)mqClient).shutdown();
            } else if (mqClient instanceof MQProducer) {
                ((MQProducer)mqClient).shutdown();
            } else if (mqClient instanceof MQPullConsumer) {
                ((MQPullConsumer)mqClient).shutdown();
            } else if (mqClient instanceof MQPushConsumer) {
                ((MQPushConsumer)mqClient).shutdown();
            } else if (mqClient instanceof MQConsumer) {
                ((MQConsumer)mqClient).shutdown();
            }
        }));
    }

    static {
        mqClients = new ArrayList<Object>();
        debug = false;
        System.setProperty("rocketmq.remoting.version", Integer.toString(MQVersion.CURRENT_VERSION));
        namesrvController = IntegrationTestBase.createAndStartNamesrv();
        NAMESRV_ADDR = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
        log.debug("Name server started, listening: {}", (Object)NAMESRV_ADDR);
        brokerController1 = IntegrationTestBase.createAndStartBroker(NAMESRV_ADDR);
        log.debug("Broker {} started, listening: {}", (Object)brokerController1.getBrokerConfig().getBrokerName(), (Object)brokerController1.getBrokerConfig().getListenPort());
        brokerController2 = IntegrationTestBase.createAndStartBroker(NAMESRV_ADDR);
        log.debug("Broker {} started, listening: {}", (Object)brokerController2.getBrokerConfig().getBrokerName(), (Object)brokerController2.getBrokerConfig().getListenPort());
        brokerController3 = IntegrationTestBase.createAndStartBroker(NAMESRV_ADDR);
        log.debug("Broker {} started, listening: {}", (Object)brokerController2.getBrokerConfig().getBrokerName(), (Object)brokerController2.getBrokerConfig().getListenPort());
        CLUSTER_NAME = brokerController1.getBrokerConfig().getBrokerClusterName();
        BROKER1_NAME = brokerController1.getBrokerConfig().getBrokerName();
        BROKER2_NAME = brokerController2.getBrokerConfig().getBrokerName();
        BROKER3_NAME = brokerController3.getBrokerConfig().getBrokerName();
        brokerControllerList = ImmutableList.of((Object)brokerController1, (Object)brokerController2, (Object)brokerController3);
        brokerControllerMap = brokerControllerList.stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
        IntegrationTestBase.initMQAdmin(NAMESRV_ADDR);
    }
}

