package com.jd.bdp.whale.client.Consumer;

import com.jd.bdp.whale.client.BrokerInfo;
import com.jd.bdp.whale.client.Client;
import com.jd.bdp.whale.client.Consumer.Fetcher;
import com.jd.bdp.whale.client.NameServer.NameServer;
import com.jd.bdp.whale.client.TopicInfo;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jd/bdp/whale/client/Consumer/Consumer.class */
public abstract class Consumer<T extends Fetcher> extends Client {
    private static Logger logger = LoggerFactory.getLogger(Consumer.class);
    protected NameServer managerNode;
    protected T fetcher;
    private String group;

    public Consumer(String str, NameServer nameServer) {
        super(str, 0);
        this.managerNode = nameServer;
    }

    public Consumer(String str, NameServer nameServer, String str2, String str3) {
        super(str, 0, str2, str3);
        this.managerNode = nameServer;
    }

    public void subscribe(String str, String str2) throws Exception {
        subscribe(str, str2, null);
    }

    public void subscribe(String str, String str2, String str3) throws Exception {
        try {
            setTopicName(str);
            this.group = str2;
            this.cluster = str3;
            this.topicInfo = this.managerNode.register(this);
            setRegsiter(true);
            shakeHandWithBrokers();
        } catch (Exception e) {
            logger.error("订阅topic失败", (Throwable) e);
        }
    }

    @Override // com.jd.bdp.whale.client.Client
    public void shutdown() {
        this.fetcher.shutdownFethManager();
        Iterator<BrokerInfo> it = this.topicInfo.getBrokers().iterator();
        while (it.hasNext()) {
            try {
                it.next().getConnectionPool().shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.managerNode.removeClient(this.topicInfo.getName(), this);
        this.managerNode.removeTopicInfo(this.topicInfo.getName());
    }

    @Override // com.jd.bdp.whale.client.Client
    public void syncBrokerOffline(List<BrokerInfo> list) throws Exception {
        logger.debug("收到broker下线信息，brokerInfos:{}", list);
        TopicInfo topicInfo = this.managerNode.getTopicInfo(getTopicName());
        this.fetcher.freshOfflineRequest(list);
        this.topicInfo = topicInfo;
        for (BrokerInfo brokerInfo : list) {
            brokerInfo.getConnectionPool().shutdown();
            logger.debug("brokerId:{}, 下线！", brokerInfo.getId());
        }
    }

    @Override // com.jd.bdp.whale.client.Client
    public void syncBrokerOnline(TopicInfo topicInfo, List<BrokerInfo> list) {
        logger.debug("收到broker上线信息，topicInfo:{}, brokerInfos:{}", topicInfo, list);
        this.topicInfo = topicInfo;
        for (BrokerInfo brokerInfo : list) {
            this.fetcher.addFetchRequest(new FetchRequest(brokerInfo));
            logger.debug("brokerId:{}, 上线！", brokerInfo.getId());
        }
    }

    public String getGroup() {
        if (this.group == null) {
            throw new RuntimeException("group is not null");
        }
        return this.group;
    }

    @Override // com.jd.bdp.whale.client.Client
    public String getTopicName() {
        return this.topicInfo != null ? this.topicInfo.getName() : super.getTopicName();
    }

    public abstract void completeSubscribe() throws Exception;
}
