package org.apache.inlong.tubemq.server.master.nodemanage.nodebroker;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.cluster.TopicInfo;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.corebase.utils.Tuple3;
import org.apache.inlong.tubemq.corebase.utils.Tuple4;
import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager;
import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutInfo;
import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutListener;
import org.apache.inlong.tubemq.server.common.statusdef.ManageStatus;
import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
import org.apache.inlong.tubemq.server.common.utils.SerialIdUtils;
import org.apache.inlong.tubemq.server.master.TMaster;
import org.apache.inlong.tubemq.server.master.metamanage.MetaDataManager;
import org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/server/master/nodemanage/nodebroker/DefBrokerRunManager.class */
public class DefBrokerRunManager implements BrokerRunManager, AliveObserver {
    private static final Logger logger = LoggerFactory.getLogger(DefBrokerRunManager.class);
    private final MetaDataManager metaDataManager;
    private final HeartbeatManager heartbeatManager;
    private final BrokerAbnHolder brokerAbnHolder;
    private final AtomicLong brokerInfoCheckSum = new AtomicLong(System.currentTimeMillis());
    private long lastBrokerUpdatedTime = System.currentTimeMillis();
    private final ConcurrentHashMap<Integer, String> brokersMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, String> brokersTLSMap = new ConcurrentHashMap<>();
    private final AtomicInteger brokerTotalCount = new AtomicInteger(0);
    private final ConcurrentHashMap<Integer, BrokerRunStatusInfo> brokerRunSyncManageMap = new ConcurrentHashMap<>();
    private final BrokerPSInfoHolder brokerPubSubInfo = new BrokerPSInfoHolder();

    public DefBrokerRunManager(TMaster tMaster) {
        this.metaDataManager = tMaster.getDefMetaDataManager();
        this.heartbeatManager = tMaster.getHeartbeatManager();
        this.brokerAbnHolder = new BrokerAbnHolder(tMaster.getMasterConfig().getMaxAutoForbiddenCnt(), this.metaDataManager);
        this.heartbeatManager.regBrokerCheckBusiness(r0.getBrokerHeartbeatTimeoutMs(), new TimeoutListener() { // from class: org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.DefBrokerRunManager.1
            @Override // org.apache.inlong.tubemq.server.common.heartbeat.TimeoutListener
            public void onTimeout(String str, TimeoutInfo timeoutInfo) throws Exception {
                DefBrokerRunManager.logger.info(new StringBuilder(512).append("[Broker Timeout] ").append(str).toString());
                DefBrokerRunManager.this.releaseBrokerRunInfo(Integer.parseInt(str), timeoutInfo.getSecondKey(), true);
            }
        });
        this.metaDataManager.registerObserver(this);
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver
    public void clearCacheData() {
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.keepalive.AliveObserver
    public void reloadCacheData() {
        updBrokerStaticInfo(this.metaDataManager.getBrokerConfInfo(null));
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public Tuple2<Long, Map<Integer, String>> getBrokerStaticInfo(boolean z) {
        return z ? new Tuple2<>(Long.valueOf(this.brokerInfoCheckSum.get()), this.brokersTLSMap) : new Tuple2<>(Long.valueOf(this.brokerInfoCheckSum.get()), this.brokersMap);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public void updBrokerStaticInfo(Map<Integer, BrokerConfEntity> map) {
        if (map == null || map.isEmpty()) {
            return;
        }
        Iterator<BrokerConfEntity> it = map.values().iterator();
        while (it.hasNext()) {
            updBrokerStaticInfo(it.next());
        }
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public void updBrokerStaticInfo(BrokerConfEntity brokerConfEntity) {
        if (brokerConfEntity == null) {
            return;
        }
        String putIfAbsent = this.brokersMap.putIfAbsent(Integer.valueOf(brokerConfEntity.getBrokerId()), brokerConfEntity.getSimpleBrokerInfo());
        String putIfAbsent2 = this.brokersTLSMap.putIfAbsent(Integer.valueOf(brokerConfEntity.getBrokerId()), brokerConfEntity.getSimpleTLSBrokerInfo());
        if (putIfAbsent == null || putIfAbsent2 == null || !putIfAbsent.equals(brokerConfEntity.getSimpleBrokerInfo()) || !putIfAbsent2.equals(brokerConfEntity.getSimpleTLSBrokerInfo())) {
            if (putIfAbsent != null && !putIfAbsent.equals(brokerConfEntity.getSimpleBrokerInfo())) {
                this.brokersMap.put(Integer.valueOf(brokerConfEntity.getBrokerId()), brokerConfEntity.getSimpleBrokerInfo());
            }
            if (putIfAbsent2 != null && !putIfAbsent2.equals(brokerConfEntity.getSimpleTLSBrokerInfo())) {
                this.brokersTLSMap.put(Integer.valueOf(brokerConfEntity.getBrokerId()), brokerConfEntity.getSimpleTLSBrokerInfo());
            }
            SerialIdUtils.updTimeStampSerialIdValue(this.brokerInfoCheckSum);
        }
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public void delBrokerStaticInfo(int i) {
        if (i == -2) {
            return;
        }
        String remove = this.brokersMap.remove(Integer.valueOf(i));
        String remove2 = this.brokersTLSMap.remove(Integer.valueOf(i));
        if (remove == null && remove2 == null) {
            return;
        }
        SerialIdUtils.updTimeStampSerialIdValue(this.brokerInfoCheckSum);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public Tuple2<Boolean, Boolean> getBrokerPublishStatus(int i) {
        return this.brokerPubSubInfo.getBrokerPubStatus(i);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public BrokerAbnHolder getBrokerAbnHolder() {
        return this.brokerAbnHolder;
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public boolean brokerRegister2M(String str, BrokerInfo brokerInfo, long j, int i, boolean z, String str2, List<String> list, boolean z2, boolean z3, StringBuilder sb, ProcessResult processResult) {
        BrokerConfEntity brokerConfByBrokerId = this.metaDataManager.getBrokerConfByBrokerId(brokerInfo.getBrokerId());
        if (brokerConfByBrokerId == null) {
            processResult.setFailResult(400, sb.append("Not found broker configure info, please create first!").append(" the connecting client id is:").append(str).toString());
            sb.delete(0, sb.length());
            return processResult.isSuccess();
        }
        if (!brokerInfo.getHost().equals(brokerConfByBrokerId.getBrokerIp()) || brokerInfo.getPort() != brokerConfByBrokerId.getBrokerPort()) {
            processResult.setFailResult(400, sb.append("Inconsistent broker configure,please confirm first!").append(" the connecting client id is:").append(str).append(", the configure's broker address by brokerId is:").append(brokerConfByBrokerId.getBrokerIdAndAddress()).toString());
            sb.delete(0, sb.length());
            return processResult.isSuccess();
        }
        int brokerTLSPort = brokerConfByBrokerId.getBrokerTLSPort();
        if (brokerTLSPort != brokerInfo.getTlsPort()) {
            processResult.setFailResult(400, sb.append("Inconsistent TLS configure, please confirm first!").append(" the connecting client id is:").append(str).append(", the configured TLS port is:").append(brokerTLSPort).append(", the broker reported TLS port is ").append(brokerInfo.getTlsPort()).toString());
            sb.delete(0, sb.length());
            return processResult.isSuccess();
        }
        if (brokerConfByBrokerId.getManageStatus() == ManageStatus.STATUS_MANAGE_APPLY) {
            processResult.setFailResult(400, sb.append("Broker's configure not online, please online configure first!").append(" the connecting client id is:").append(str).toString());
            sb.delete(0, sb.length());
            return processResult.isSuccess();
        }
        String brokerDefaultConfInfo = brokerConfByBrokerId.getBrokerDefaultConfInfo();
        Map<String, String> brokerTopicStrConfigInfo = this.metaDataManager.getBrokerTopicStrConfigInfo(brokerConfByBrokerId, sb);
        BrokerRunStatusInfo brokerRunStatusInfo = this.brokerRunSyncManageMap.get(Integer.valueOf(brokerInfo.getBrokerId()));
        if (brokerRunStatusInfo == null) {
            BrokerRunStatusInfo brokerRunStatusInfo2 = new BrokerRunStatusInfo(this, brokerInfo, brokerConfByBrokerId.getManageStatus(), brokerDefaultConfInfo, brokerTopicStrConfigInfo, z3);
            brokerRunStatusInfo = this.brokerRunSyncManageMap.putIfAbsent(Integer.valueOf(brokerInfo.getBrokerId()), brokerRunStatusInfo2);
            if (brokerRunStatusInfo == null) {
                this.brokerTotalCount.incrementAndGet();
                brokerRunStatusInfo = brokerRunStatusInfo2;
            }
        } else {
            brokerRunStatusInfo.reInitRunStatusInfo(brokerInfo, brokerConfByBrokerId.getManageStatus(), brokerDefaultConfInfo, brokerTopicStrConfigInfo, z3);
        }
        brokerRunStatusInfo.bookBrokerReportInfo(true, z2, j, i, z, str2, list, sb);
        this.heartbeatManager.regBrokerNode(String.valueOf(brokerInfo.getBrokerId()), brokerRunStatusInfo.getCreateId());
        processResult.setSuccResult(null);
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public boolean brokerHeartBeat2M(int i, long j, int i2, boolean z, String str, List<String> list, boolean z2, List<String> list2, int i3, int i4, boolean z3, StringBuilder sb, ProcessResult processResult) {
        BrokerRunStatusInfo brokerRunStatusInfo = this.brokerRunSyncManageMap.get(Integer.valueOf(i));
        if (brokerRunStatusInfo == null) {
            processResult.setFailResult(411, sb.append("Not found Broker run status info, please register broker first!").append(" the connecting client id is:").append(i).toString());
            return processResult.isSuccess();
        }
        if (!this.heartbeatManager.updBrokerNode(String.valueOf(i), brokerRunStatusInfo.getCreateId(), sb, processResult)) {
            return processResult.isSuccess();
        }
        brokerRunStatusInfo.bookBrokerReportInfo(false, z3, j, i2, z, str, list, sb);
        if (z2) {
            this.metaDataManager.clearRmvedTopicConfInfo(i, list2, sb, processResult);
            logger.info(sb.append("[Broker Report] receive broker removed topics = ").append(list2.toString()).append(", removed result is ").append(processResult.getErrInfo()).toString());
            sb.delete(0, sb.length());
        }
        this.brokerAbnHolder.updateBrokerReportStatus(i, i3, i4);
        processResult.setSuccResult(null);
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public boolean brokerClose2M(int i, StringBuilder sb, ProcessResult processResult) {
        BrokerRunStatusInfo brokerRunStatusInfo = this.brokerRunSyncManageMap.get(Integer.valueOf(i));
        if (brokerRunStatusInfo == null) {
            processResult.setFailResult(411, sb.append("Not found Broker run status info, please register broker first!").append(" the connecting client id is:").append(i).toString());
            return processResult.isSuccess();
        }
        if (!this.heartbeatManager.unRegBrokerNode(String.valueOf(i), brokerRunStatusInfo.getCreateId())) {
            logger.info(sb.append("[Broker Closed] brokerId=").append(i).append(" unregister failure, run-info has been replaced by new request!").toString());
            return processResult.isSuccess();
        }
        boolean isOverTLS = brokerRunStatusInfo.isOverTLS();
        releaseBrokerRunInfo(i, brokerRunStatusInfo.getCreateId(), false);
        logger.info(sb.append("[Broker Closed]").append(i).append(" unregister success, isOverTLS=").append(isOverTLS).toString());
        processResult.setSuccResult(null);
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public Tuple3<ManageStatus, String, Map<String, String>> getBrokerMetaConfigInfo(int i) {
        String str = null;
        ManageStatus manageStatus = ManageStatus.STATUS_MANAGE_UNDEFINED;
        StringBuilder sb = new StringBuilder(512);
        BrokerConfEntity brokerConfByBrokerId = this.metaDataManager.getBrokerConfByBrokerId(i);
        if (brokerConfByBrokerId != null) {
            str = brokerConfByBrokerId.getBrokerDefaultConfInfo();
            manageStatus = brokerConfByBrokerId.getManageStatus();
        }
        return new Tuple3<>(manageStatus, str, this.metaDataManager.getBrokerTopicStrConfigInfo(brokerConfByBrokerId, sb));
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public void setRegisterDownConfInfo(int i, StringBuilder sb, ClientMaster.RegisterResponseM2B.Builder builder) {
        BrokerRunStatusInfo brokerRunStatusInfo = this.brokerRunSyncManageMap.get(Integer.valueOf(i));
        if (brokerRunStatusInfo == null) {
            logger.info(sb.append("Get Broker run-info failure, brokerId=").append(i).append(", please check the implement first!").toString());
            sb.delete(0, sb.length());
            return;
        }
        Tuple4<Long, Integer, String, List<String>> needSyncData = brokerRunStatusInfo.getNeedSyncData();
        builder.setCurBrokerConfId(((Long) needSyncData.getF0()).longValue());
        builder.setConfCheckSumId(((Integer) needSyncData.getF1()).intValue());
        Tuple2<Boolean, Boolean> brokerAutoFbdStatus = this.brokerAbnHolder.getBrokerAutoFbdStatus(i);
        builder.setStopWrite(((Boolean) brokerAutoFbdStatus.getF0()).booleanValue());
        builder.setStopRead(((Boolean) brokerAutoFbdStatus.getF1()).booleanValue());
        if (needSyncData.getF2() == null) {
            builder.setTakeConfInfo(false);
            return;
        }
        builder.setTakeConfInfo(true);
        builder.setBrokerDefaultConfInfo((String) needSyncData.getF2());
        builder.addAllBrokerTopicSetConfInfo((Iterable) needSyncData.getF3());
        logger.info(sb.append("[TMaster sync] push broker configure: brokerId = ").append(i).append(",configureId=").append(needSyncData.getF0()).append(",stopWrite=").append(builder.getStopWrite()).append(",stopRead=").append(builder.getStopRead()).append(",checksumId=").append(needSyncData.getF1()).append(",default configure is ").append((String) needSyncData.getF2()).append(",topic configure is ").append(needSyncData.getF3()).toString());
        sb.delete(0, sb.length());
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public void setHeatBeatDownConfInfo(int i, StringBuilder sb, ClientMaster.HeartResponseM2B.Builder builder) {
        BrokerRunStatusInfo brokerRunStatusInfo = this.brokerRunSyncManageMap.get(Integer.valueOf(i));
        if (brokerRunStatusInfo == null) {
            logger.info(sb.append("Get Broker run-info failure, brokerId=").append(i).append(", please check the implement first!").toString());
            sb.delete(0, sb.length());
            return;
        }
        Tuple4<Long, Integer, String, List<String>> needSyncData = brokerRunStatusInfo.getNeedSyncData();
        builder.setCurBrokerConfId(((Long) needSyncData.getF0()).longValue());
        builder.setConfCheckSumId(((Integer) needSyncData.getF1()).intValue());
        Tuple2<Boolean, Boolean> brokerAutoFbdStatus = this.brokerAbnHolder.getBrokerAutoFbdStatus(i);
        builder.setStopWrite(((Boolean) brokerAutoFbdStatus.getF0()).booleanValue());
        builder.setStopRead(((Boolean) brokerAutoFbdStatus.getF1()).booleanValue());
        if (needSyncData.getF2() == null) {
            builder.setNeedReportData(false);
            builder.setTakeConfInfo(false);
            return;
        }
        builder.setNeedReportData(true);
        builder.setTakeConfInfo(true);
        builder.setBrokerDefaultConfInfo((String) needSyncData.getF2());
        builder.addAllBrokerTopicSetConfInfo((Iterable) needSyncData.getF3());
        logger.info(sb.append("[TMaster sync] heartbeat sync config: brokerId = ").append(i).append(",configureId=").append(needSyncData.getF0()).append(",stopWrite=").append(builder.getStopWrite()).append(",stopRead=").append(builder.getStopRead()).append(",checksumId=").append(needSyncData.getF1()).append(",default configure is ").append((String) needSyncData.getF2()).append(",topic configure is ").append(needSyncData.getF3()).toString());
        sb.delete(0, sb.length());
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public BrokerRunStatusInfo getBrokerRunStatusInfo(int i) {
        return this.brokerRunSyncManageMap.get(Integer.valueOf(i));
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public BrokerInfo getBrokerInfo(int i) {
        BrokerRunStatusInfo brokerRunStatusInfo = this.brokerRunSyncManageMap.get(Integer.valueOf(i));
        if (brokerRunStatusInfo == null) {
            return null;
        }
        return brokerRunStatusInfo.getBrokerInfo();
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public Map<Integer, BrokerInfo> getBrokerInfoMap(List<Integer> list) {
        HashMap hashMap = new HashMap();
        if (list == null || list.isEmpty()) {
            for (BrokerRunStatusInfo brokerRunStatusInfo : this.brokerRunSyncManageMap.values()) {
                if (brokerRunStatusInfo != null) {
                    BrokerInfo brokerInfo = brokerRunStatusInfo.getBrokerInfo();
                    hashMap.put(Integer.valueOf(brokerInfo.getBrokerId()), brokerInfo);
                }
            }
        } else {
            for (Integer num : list) {
                BrokerRunStatusInfo brokerRunStatusInfo2 = this.brokerRunSyncManageMap.get(num);
                if (brokerRunStatusInfo2 != null) {
                    hashMap.put(num, brokerRunStatusInfo2.getBrokerInfo());
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public boolean releaseBrokerRunInfo(int i, String str, boolean z) {
        StringBuilder sb = new StringBuilder(512);
        BrokerRunStatusInfo brokerRunStatusInfo = this.brokerRunSyncManageMap.get(Integer.valueOf(i));
        if (brokerRunStatusInfo == null) {
            logger.info(sb.append("[Broker Release] brokerId=").append(i).append(", isTimeout=").append(z).append(", release failure, run-info has deleted before!").toString());
            return false;
        }
        if (!str.equals(brokerRunStatusInfo.getCreateId())) {
            logger.info(sb.append("[Broker Release] brokerId=").append(i).append(", isTimeout=").append(z).append(", release failure, run-info has been replaced by new register!").toString());
            return false;
        }
        if (this.brokerRunSyncManageMap.remove(Integer.valueOf(i)) == null) {
            return false;
        }
        this.brokerTotalCount.decrementAndGet();
        this.brokerAbnHolder.removeBroker(Integer.valueOf(i));
        this.brokerPubSubInfo.rmvBrokerAllPushedInfo(i);
        logger.info(sb.append("[Broker Release] brokerId=").append(i).append(", isTimeout=").append(z).append(", release success!").toString());
        return true;
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public boolean updBrokerCsmConfInfo(int i, ManageStatus manageStatus, Map<String, TopicInfo> map) {
        this.brokerPubSubInfo.updBrokerMangeStatus(i, manageStatus);
        return this.brokerPubSubInfo.updBrokerSubTopicConfInfo(i, map);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public void updBrokerPrdConfInfo(int i, ManageStatus manageStatus, Map<String, TopicInfo> map) {
        this.brokerPubSubInfo.updBrokerPubTopicConfInfo(i, map);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public Map<String, String> getPubBrokerAcceptPubPartInfo(Set<String> set) {
        return this.brokerPubSubInfo.getAcceptPubPartInfo(set);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public int getSubTopicMaxBrokerCount(Set<String> set) {
        return this.brokerPubSubInfo.getTopicMaxSubBrokerCnt(set);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public Map<String, Partition> getSubBrokerAcceptSubParts(Set<String> set) {
        return this.brokerPubSubInfo.getAcceptSubParts(set);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public List<Partition> getSubBrokerAcceptSubParts(String str) {
        return this.brokerPubSubInfo.getAcceptSubParts(str);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public TopicInfo getPubBrokerTopicInfo(int i, String str) {
        return this.brokerPubSubInfo.getBrokerPubPushedTopicInfo(i, str);
    }

    @Override // org.apache.inlong.tubemq.server.master.nodemanage.nodebroker.BrokerRunManager
    public List<TopicInfo> getPubBrokerPushedTopicInfo(int i) {
        return this.brokerPubSubInfo.getPubBrokerPushedTopicInfo(i);
    }
}
