package org.apache.inlong.tubemq.server.master.metamanage.metastore.impl.bdbimpl;

import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.server.common.exception.LoadMetaException;
import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
import org.apache.inlong.tubemq.server.master.bdbstore.bdbentitys.BdbBrokerConfEntity;
import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbBrokerConfigMapperImpl.class */
public class BdbBrokerConfigMapperImpl implements BrokerConfigMapper {
    private static final Logger logger = LoggerFactory.getLogger(BdbBrokerConfigMapperImpl.class);
    private EntityStore brokerConfStore;
    private PrimaryIndex<Integer, BdbBrokerConfEntity> brokerConfIndex;
    private ConcurrentHashMap<Integer, BrokerConfEntity> brokerConfCache = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Integer> brokerIpIndexCache = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Integer, ConcurrentHashSet<Integer>> regionIndexCache = new ConcurrentHashMap<>();

    public BdbBrokerConfigMapperImpl(ReplicatedEnvironment replicatedEnvironment, StoreConfig storeConfig) {
        this.brokerConfStore = new EntityStore(replicatedEnvironment, TBDBStoreTables.BDB_BROKER_CONFIG_STORE_NAME, storeConfig);
        this.brokerConfIndex = this.brokerConfStore.getPrimaryIndex(Integer.class, BdbBrokerConfEntity.class);
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.AbstractMapper
    public void close() {
        clearCacheData();
        if (this.brokerConfStore != null) {
            try {
                this.brokerConfStore.close();
                this.brokerConfStore = null;
            } catch (Throwable th) {
                logger.error("[BDB Impl] close broker configure failure ", th);
            }
        }
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.AbstractMapper
    public void loadConfig() throws LoadMetaException {
        long j = 0;
        EntityCursor<BdbBrokerConfEntity> entityCursor = null;
        logger.info("[BDB Impl] load broker configure start...");
        try {
            try {
                clearCacheData();
                entityCursor = this.brokerConfIndex.entities();
                for (BdbBrokerConfEntity bdbBrokerConfEntity : entityCursor) {
                    if (bdbBrokerConfEntity == null) {
                        logger.warn("[BDB Impl] found Null data while loading broker configure!");
                    } else {
                        addOrUpdCacheRecord(new BrokerConfEntity(bdbBrokerConfEntity));
                        j++;
                    }
                }
                logger.info("[BDB Impl] total broker configure records are {}", Long.valueOf(j));
                if (entityCursor != null) {
                    entityCursor.close();
                }
                logger.info("[BDB Impl] load broker configure successfully...");
            } catch (Exception e) {
                logger.error("[BDB Impl] load broker configure failure ", e);
                throw new LoadMetaException(e.getMessage());
            }
        } catch (Throwable th) {
            if (entityCursor != null) {
                entityCursor.close();
            }
            throw th;
        }
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper
    public boolean addBrokerConf(BrokerConfEntity brokerConfEntity, ProcessResult processResult) {
        if (this.brokerConfCache.get(Integer.valueOf(brokerConfEntity.getBrokerId())) != null) {
            processResult.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(), new StringBuilder(512).append("The broker's brokerId ").append(brokerConfEntity.getBrokerId()).append(" has already exists, the value must be unique!").toString());
            return processResult.isSuccess();
        }
        if (this.brokerIpIndexCache.get(brokerConfEntity.getBrokerIp()) != null) {
            processResult.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(), new StringBuilder(512).append("The broker's brokerIp ").append(brokerConfEntity.getBrokerIp()).append(" has already exists, the value must be unique!").toString());
            return processResult.isSuccess();
        }
        if (putBrokerConfig2Bdb(brokerConfEntity, processResult)) {
            addOrUpdCacheRecord(brokerConfEntity);
        }
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper
    public boolean updBrokerConf(BrokerConfEntity brokerConfEntity, ProcessResult processResult) {
        BrokerConfEntity brokerConfEntity2 = this.brokerConfCache.get(Integer.valueOf(brokerConfEntity.getBrokerId()));
        if (brokerConfEntity2 == null) {
            processResult.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(), new StringBuilder(512).append("The broker ").append(brokerConfEntity.getBrokerIp()).append("'s configure is not exists, please add record first!").toString());
            return processResult.isSuccess();
        }
        if (brokerConfEntity2.equals(brokerConfEntity)) {
            processResult.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(), new StringBuilder(512).append("The broker ").append(brokerConfEntity.getBrokerIp()).append("'s configure have not changed, please delete it first!").toString());
            return processResult.isSuccess();
        }
        if (putBrokerConfig2Bdb(brokerConfEntity, processResult)) {
            addOrUpdCacheRecord(brokerConfEntity);
            processResult.setRetData(brokerConfEntity2);
        }
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper
    public boolean delBrokerConf(int i, ProcessResult processResult) {
        BrokerConfEntity brokerConfEntity = this.brokerConfCache.get(Integer.valueOf(i));
        if (brokerConfEntity == null) {
            processResult.setSuccResult(null);
            return processResult.isSuccess();
        }
        delBrokerConfigFromBdb(i);
        delCacheRecord(i);
        processResult.setSuccResult(brokerConfEntity);
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper
    public Map<Integer, BrokerConfEntity> getBrokerConfInfo(BrokerConfEntity brokerConfEntity) {
        HashMap hashMap = new HashMap();
        if (brokerConfEntity == null) {
            for (BrokerConfEntity brokerConfEntity2 : this.brokerConfCache.values()) {
                hashMap.put(Integer.valueOf(brokerConfEntity2.getBrokerId()), brokerConfEntity2);
            }
        } else {
            for (BrokerConfEntity brokerConfEntity3 : this.brokerConfCache.values()) {
                if (brokerConfEntity3 != null && brokerConfEntity3.isMatched(brokerConfEntity)) {
                    hashMap.put(Integer.valueOf(brokerConfEntity3.getBrokerId()), brokerConfEntity3);
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper
    public Map<Integer, BrokerConfEntity> getBrokerConfInfo(Set<Integer> set, Set<String> set2, BrokerConfEntity brokerConfEntity) {
        HashSet<Integer> hashSet = null;
        HashSet hashSet2 = null;
        HashSet hashSet3 = null;
        HashMap hashMap = new HashMap();
        if (set != null && !set.isEmpty()) {
            hashSet = new HashSet();
            for (Integer num : set) {
                if (this.brokerConfCache.get(num) != null) {
                    hashSet.add(num);
                }
            }
            if (hashSet.isEmpty()) {
                return hashMap;
            }
        }
        if (set2 != null && !set2.isEmpty()) {
            hashSet2 = new HashSet();
            Iterator<String> it = set2.iterator();
            while (it.hasNext()) {
                Integer num2 = this.brokerIpIndexCache.get(it.next());
                if (num2 != null) {
                    hashSet2.add(num2);
                }
            }
            if (hashSet2.isEmpty()) {
                return hashMap;
            }
        }
        if (hashSet != null || hashSet2 != null) {
            if (hashSet == null) {
                hashSet3 = new HashSet(hashSet2);
            } else if (hashSet2 == null) {
                hashSet3 = new HashSet(hashSet);
            } else {
                hashSet3 = new HashSet();
                for (Integer num3 : hashSet) {
                    if (hashSet2.contains(num3)) {
                        hashSet3.add(num3);
                    }
                }
            }
        }
        if (hashSet3 == null) {
            for (BrokerConfEntity brokerConfEntity2 : this.brokerConfCache.values()) {
                if (brokerConfEntity2 != null && (brokerConfEntity == null || brokerConfEntity2.isMatched(brokerConfEntity))) {
                    hashMap.put(Integer.valueOf(brokerConfEntity2.getBrokerId()), brokerConfEntity2);
                }
            }
        } else {
            Iterator it2 = hashSet3.iterator();
            while (it2.hasNext()) {
                BrokerConfEntity brokerConfEntity3 = this.brokerConfCache.get((Integer) it2.next());
                if (brokerConfEntity3 != null && (brokerConfEntity == null || brokerConfEntity3.isMatched(brokerConfEntity))) {
                    hashMap.put(Integer.valueOf(brokerConfEntity3.getBrokerId()), brokerConfEntity3);
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper
    public BrokerConfEntity getBrokerConfByBrokerId(int i) {
        return this.brokerConfCache.get(Integer.valueOf(i));
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper
    public BrokerConfEntity getBrokerConfByBrokerIp(String str) {
        Integer num = this.brokerIpIndexCache.get(str);
        if (num == null) {
            return null;
        }
        return this.brokerConfCache.get(num);
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.BrokerConfigMapper
    public Map<Integer, Set<Integer>> getBrokerIdByRegionId(Set<Integer> set) {
        HashSet<Integer> hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        if (set == null || set.isEmpty()) {
            hashSet.addAll(this.regionIndexCache.keySet());
        } else {
            hashSet.addAll(set);
        }
        for (Integer num : hashSet) {
            ConcurrentHashSet<Integer> concurrentHashSet = this.regionIndexCache.get(num);
            if (concurrentHashSet != null && !concurrentHashSet.isEmpty()) {
                hashMap.put(num, concurrentHashSet);
            }
        }
        return hashMap;
    }

    private boolean putBrokerConfig2Bdb(BrokerConfEntity brokerConfEntity, ProcessResult processResult) {
        try {
            processResult.setSuccResult(null);
            return processResult.isSuccess();
        } catch (Throwable th) {
            logger.error("[BDB Impl] put broker configure failure ", th);
            processResult.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(), new StringBuilder(512).append("Put broker configure failure: ").append(th.getMessage()).toString());
            return processResult.isSuccess();
        }
    }

    private boolean delBrokerConfigFromBdb(int i) {
        try {
            this.brokerConfIndex.delete(Integer.valueOf(i));
            return true;
        } catch (Throwable th) {
            logger.error("[BDB Impl] delete broker configure failure ", th);
            return false;
        }
    }

    private void delCacheRecord(int i) {
        BrokerConfEntity remove = this.brokerConfCache.remove(Integer.valueOf(i));
        if (remove == null) {
            return;
        }
        this.brokerIpIndexCache.remove(remove.getBrokerIp());
        ConcurrentHashSet<Integer> concurrentHashSet = this.regionIndexCache.get(Integer.valueOf(remove.getRegionId()));
        if (concurrentHashSet == null) {
            return;
        }
        concurrentHashSet.remove(Integer.valueOf(i));
    }

    private void addOrUpdCacheRecord(BrokerConfEntity brokerConfEntity) {
        this.brokerConfCache.put(Integer.valueOf(brokerConfEntity.getBrokerId()), brokerConfEntity);
        Integer num = this.brokerIpIndexCache.get(brokerConfEntity.getBrokerIp());
        if (num == null || num.intValue() != brokerConfEntity.getBrokerId()) {
            this.brokerIpIndexCache.put(brokerConfEntity.getBrokerIp(), Integer.valueOf(brokerConfEntity.getBrokerId()));
        }
        ConcurrentHashSet<Integer> concurrentHashSet = this.regionIndexCache.get(Integer.valueOf(brokerConfEntity.getRegionId()));
        if (concurrentHashSet == null) {
            ConcurrentHashSet<Integer> concurrentHashSet2 = new ConcurrentHashSet<>();
            concurrentHashSet = this.regionIndexCache.putIfAbsent(Integer.valueOf(brokerConfEntity.getRegionId()), concurrentHashSet2);
            if (concurrentHashSet == null) {
                concurrentHashSet = concurrentHashSet2;
            }
        }
        concurrentHashSet.add(Integer.valueOf(brokerConfEntity.getBrokerId()));
    }

    private void clearCacheData() {
        this.brokerIpIndexCache.clear();
        this.regionIndexCache.clear();
        this.brokerConfCache.clear();
    }
}
