package org.apache.inlong.tubemq.server.master.metamanage.metastore.impl.bdbimpl;

import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.corebase.utils.KeyBuilderUtils;
import org.apache.inlong.tubemq.server.common.exception.LoadMetaException;
import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
import org.apache.inlong.tubemq.server.master.bdbstore.bdbentitys.BdbTopicConfEntity;
import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.TopicDeployEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbTopicDeployMapperImpl.class */
public class BdbTopicDeployMapperImpl implements TopicDeployMapper {
    private static final Logger logger = LoggerFactory.getLogger(BdbTopicDeployMapperImpl.class);
    private EntityStore topicConfStore;
    private PrimaryIndex<String, BdbTopicConfEntity> topicConfIndex;
    private ConcurrentHashMap<String, TopicDeployEntity> topicConfCache = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Integer, ConcurrentHashSet<String>> brokerIdCacheIndex = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ConcurrentHashSet<String>> topicNameCacheIndex = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Integer, ConcurrentHashSet<String>> brokerId2TopicCacheIndex = new ConcurrentHashMap<>();

    public BdbTopicDeployMapperImpl(ReplicatedEnvironment replicatedEnvironment, StoreConfig storeConfig) {
        this.topicConfStore = new EntityStore(replicatedEnvironment, TBDBStoreTables.BDB_TOPIC_CONFIG_STORE_NAME, storeConfig);
        this.topicConfIndex = this.topicConfStore.getPrimaryIndex(String.class, BdbTopicConfEntity.class);
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.AbstractMapper
    public void close() {
        if (this.topicConfStore != null) {
            try {
                this.topicConfStore.close();
                this.topicConfStore = null;
            } catch (Throwable th) {
                logger.error("[BDB Impl] close topic configure failure ", th);
            }
        }
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.AbstractMapper
    public void loadConfig() throws LoadMetaException {
        long j = 0;
        EntityCursor<BdbTopicConfEntity> entityCursor = null;
        logger.info("[BDB Impl] load topic configure start...");
        try {
            try {
                entityCursor = this.topicConfIndex.entities();
                for (BdbTopicConfEntity bdbTopicConfEntity : entityCursor) {
                    if (bdbTopicConfEntity == null) {
                        logger.warn("[BDB Impl] found Null data while loading topic configure!");
                    } else {
                        addOrUpdCacheRecord(new TopicDeployEntity(bdbTopicConfEntity));
                        j++;
                    }
                }
                logger.info("[BDB Impl] total topic configure records are {}", Long.valueOf(j));
                if (entityCursor != null) {
                    entityCursor.close();
                }
                logger.info("[BDB Impl] load topic configure successfully...");
            } catch (Exception e) {
                logger.error("[BDB Impl] load topic configure failure ", e);
                throw new LoadMetaException(e.getMessage());
            }
        } catch (Throwable th) {
            if (entityCursor != null) {
                entityCursor.close();
            }
            throw th;
        }
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public boolean addTopicConf(TopicDeployEntity topicDeployEntity, ProcessResult processResult) {
        if (this.topicConfCache.get(topicDeployEntity.getRecordKey()) != null) {
            processResult.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(), new StringBuilder(512).append("The topic configure ").append(topicDeployEntity.getRecordKey()).append("'s configure already exists, please delete it first!").toString());
            return processResult.isSuccess();
        }
        if (putTopicConfig2Bdb(topicDeployEntity, processResult)) {
            addOrUpdCacheRecord(topicDeployEntity);
        }
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public boolean updTopicConf(TopicDeployEntity topicDeployEntity, ProcessResult processResult) {
        TopicDeployEntity topicDeployEntity2 = this.topicConfCache.get(topicDeployEntity.getRecordKey());
        if (topicDeployEntity2 == null) {
            processResult.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(), new StringBuilder(512).append("The topic configure ").append(topicDeployEntity.getRecordKey()).append("'s configure is not exists, please add record first!").toString());
            return processResult.isSuccess();
        }
        if (topicDeployEntity2.equals(topicDeployEntity)) {
            processResult.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(), new StringBuilder(512).append("The topic configure ").append(topicDeployEntity.getRecordKey()).append("'s configure have not changed, please delete it first!").toString());
            return processResult.isSuccess();
        }
        if (putTopicConfig2Bdb(topicDeployEntity, processResult)) {
            addOrUpdCacheRecord(topicDeployEntity);
            processResult.setRetData(topicDeployEntity2);
        }
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public boolean delTopicConf(String str, ProcessResult processResult) {
        TopicDeployEntity topicDeployEntity = this.topicConfCache.get(str);
        if (topicDeployEntity == null) {
            processResult.setSuccResult(null);
            return processResult.isSuccess();
        }
        delTopicConfigFromBdb(str);
        delCacheRecord(str);
        processResult.setSuccResult(topicDeployEntity);
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public boolean delTopicConfByBrokerId(Integer num, ProcessResult processResult) {
        ConcurrentHashSet<String> concurrentHashSet = this.brokerIdCacheIndex.get(num);
        if (concurrentHashSet == null) {
            processResult.setSuccResult(null);
            return processResult.isSuccess();
        }
        Iterator it = concurrentHashSet.iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            delTopicConfigFromBdb(str);
            delCacheRecord(str);
        }
        processResult.setSuccResult(null);
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public boolean hasConfiguredTopics(int i) {
        ConcurrentHashSet<String> concurrentHashSet = this.brokerIdCacheIndex.get(Integer.valueOf(i));
        return (concurrentHashSet == null || concurrentHashSet.isEmpty()) ? false : true;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public TopicDeployEntity getTopicConfByeRecKey(String str) {
        return this.topicConfCache.get(str);
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public List<TopicDeployEntity> getTopicConf(TopicDeployEntity topicDeployEntity) {
        ArrayList arrayList = new ArrayList();
        if (topicDeployEntity == null) {
            arrayList.addAll(this.topicConfCache.values());
        } else {
            for (TopicDeployEntity topicDeployEntity2 : this.topicConfCache.values()) {
                if (topicDeployEntity2 != null && topicDeployEntity2.isMatched(topicDeployEntity)) {
                    arrayList.add(topicDeployEntity2);
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public TopicDeployEntity getTopicConf(int i, String str) {
        return this.topicConfCache.get(KeyBuilderUtils.buildTopicConfRecKey(i, str));
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public boolean isTopicDeployed(String str) {
        ConcurrentHashSet<String> concurrentHashSet = this.topicNameCacheIndex.get(str);
        return (concurrentHashSet == null || concurrentHashSet.isEmpty()) ? false : true;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public Map<String, List<TopicDeployEntity>> getTopicConfMap(Set<String> set, Set<Integer> set2, TopicDeployEntity topicDeployEntity) {
        HashMap hashMap = new HashMap();
        Set<String> matchedRecords = getMatchedRecords(set, set2);
        if (matchedRecords == null) {
            for (TopicDeployEntity topicDeployEntity2 : this.topicConfCache.values()) {
                if (topicDeployEntity2 != null && (topicDeployEntity == null || topicDeployEntity2.isMatched(topicDeployEntity))) {
                    ((List) hashMap.computeIfAbsent(topicDeployEntity2.getTopicName(), str -> {
                        return new ArrayList();
                    })).add(topicDeployEntity2);
                }
            }
        } else {
            Iterator<String> it = matchedRecords.iterator();
            while (it.hasNext()) {
                TopicDeployEntity topicDeployEntity3 = this.topicConfCache.get(it.next());
                if (topicDeployEntity3 != null && (topicDeployEntity == null || topicDeployEntity3.isMatched(topicDeployEntity))) {
                    ((List) hashMap.computeIfAbsent(topicDeployEntity3.getTopicName(), str2 -> {
                        return new ArrayList();
                    })).add(topicDeployEntity3);
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public Map<Integer, List<TopicDeployEntity>> getTopicDeployInfoMap(Set<String> set, Set<Integer> set2) {
        HashMap hashMap = new HashMap();
        if (set2 != null) {
            Iterator<Integer> it = set2.iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), new ArrayList());
            }
        }
        Set<String> matchedRecords = getMatchedRecords(set, set2);
        if (matchedRecords == null) {
            matchedRecords = new HashSet(this.topicConfCache.keySet());
        }
        Iterator<String> it2 = matchedRecords.iterator();
        while (it2.hasNext()) {
            TopicDeployEntity topicDeployEntity = this.topicConfCache.get(it2.next());
            if (topicDeployEntity != null) {
                ((List) hashMap.computeIfAbsent(Integer.valueOf(topicDeployEntity.getBrokerId()), num -> {
                    return new ArrayList();
                })).add(topicDeployEntity);
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public Map<String, List<TopicDeployEntity>> getTopicConfMapByTopicAndBrokerIds(Set<String> set, Set<Integer> set2) {
        HashMap hashMap = new HashMap();
        Set<String> matchedRecords = getMatchedRecords(set, set2);
        if (matchedRecords == null) {
            for (TopicDeployEntity topicDeployEntity : this.topicConfCache.values()) {
                if (topicDeployEntity != null) {
                    ((List) hashMap.computeIfAbsent(topicDeployEntity.getTopicName(), str -> {
                        return new ArrayList();
                    })).add(topicDeployEntity);
                }
            }
        } else {
            Iterator<String> it = matchedRecords.iterator();
            while (it.hasNext()) {
                TopicDeployEntity topicDeployEntity2 = this.topicConfCache.get(it.next());
                if (topicDeployEntity2 != null) {
                    ((List) hashMap.computeIfAbsent(topicDeployEntity2.getTopicName(), str2 -> {
                        return new ArrayList();
                    })).add(topicDeployEntity2);
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public Map<String, Map<Integer, String>> getTopicBrokerInfo(Set<String> set) {
        HashMap hashMap = new HashMap();
        if (set == null || set.isEmpty()) {
            for (TopicDeployEntity topicDeployEntity : this.topicConfCache.values()) {
                if (topicDeployEntity != null) {
                    ((Map) hashMap.computeIfAbsent(topicDeployEntity.getTopicName(), str -> {
                        return new HashMap();
                    })).put(Integer.valueOf(topicDeployEntity.getBrokerId()), topicDeployEntity.getBrokerIp());
                }
            }
        } else {
            for (String str2 : set) {
                if (str2 != null) {
                    Map map = (Map) hashMap.computeIfAbsent(str2, str3 -> {
                        return new HashMap();
                    });
                    ConcurrentHashSet<String> concurrentHashSet = this.topicNameCacheIndex.get(str2);
                    if (concurrentHashSet != null) {
                        Iterator it = concurrentHashSet.iterator();
                        while (it.hasNext()) {
                            TopicDeployEntity topicDeployEntity2 = this.topicConfCache.get((String) it.next());
                            if (topicDeployEntity2 != null) {
                                map.put(Integer.valueOf(topicDeployEntity2.getBrokerId()), topicDeployEntity2.getBrokerIp());
                            }
                        }
                    }
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public Set<String> getConfiguredTopicSet() {
        return new HashSet(this.topicNameCacheIndex.keySet());
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public Map<String, TopicDeployEntity> getConfiguredTopicInfo(int i) {
        HashMap hashMap = new HashMap();
        ConcurrentHashSet<String> concurrentHashSet = this.brokerIdCacheIndex.get(Integer.valueOf(i));
        if (concurrentHashSet == null || concurrentHashSet.isEmpty()) {
            return hashMap;
        }
        Iterator it = concurrentHashSet.iterator();
        while (it.hasNext()) {
            TopicDeployEntity topicDeployEntity = this.topicConfCache.get((String) it.next());
            if (topicDeployEntity != null) {
                hashMap.put(topicDeployEntity.getTopicName(), topicDeployEntity);
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.TopicDeployMapper
    public Map<Integer, Set<String>> getConfiguredTopicInfo(Set<Integer> set) {
        HashMap hashMap = new HashMap();
        if (set == null || set.isEmpty()) {
            for (Map.Entry<Integer, ConcurrentHashSet<String>> entry : this.brokerId2TopicCacheIndex.entrySet()) {
                if (entry.getKey() != null) {
                    HashSet hashSet = new HashSet();
                    if (entry.getValue() != null) {
                        hashSet.addAll(entry.getValue());
                    }
                    hashMap.put(entry.getKey(), hashSet);
                }
            }
        } else {
            for (Integer num : set) {
                if (num != null) {
                    HashSet hashSet2 = new HashSet();
                    ConcurrentHashSet<String> concurrentHashSet = this.brokerId2TopicCacheIndex.get(num);
                    if (concurrentHashSet != null) {
                        hashSet2.addAll(concurrentHashSet);
                    }
                    hashMap.put(num, hashSet2);
                }
            }
        }
        return hashMap;
    }

    private boolean putTopicConfig2Bdb(TopicDeployEntity topicDeployEntity, ProcessResult processResult) {
        try {
            processResult.setSuccResult(null);
            return processResult.isSuccess();
        } catch (Throwable th) {
            logger.error("[BDB Impl] put topic configure failure ", th);
            processResult.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(), new StringBuilder(512).append("Put topic configure failure: ").append(th.getMessage()).toString());
            return processResult.isSuccess();
        }
    }

    private boolean delTopicConfigFromBdb(String str) {
        try {
            this.topicConfIndex.delete(str);
            return true;
        } catch (Throwable th) {
            logger.error("[BDB Impl] delete topic configure failure ", th);
            return false;
        }
    }

    private void delCacheRecord(String str) {
        TopicDeployEntity remove = this.topicConfCache.remove(str);
        if (remove == null) {
            return;
        }
        ConcurrentHashSet<String> concurrentHashSet = this.topicNameCacheIndex.get(remove.getTopicName());
        if (concurrentHashSet != null) {
            concurrentHashSet.remove(str);
            if (concurrentHashSet.isEmpty()) {
                this.topicNameCacheIndex.remove(remove.getTopicName());
            }
        }
        ConcurrentHashSet<String> concurrentHashSet2 = this.brokerIdCacheIndex.get(Integer.valueOf(remove.getBrokerId()));
        if (concurrentHashSet2 != null) {
            concurrentHashSet2.remove(str);
            if (concurrentHashSet2.isEmpty()) {
                this.brokerIdCacheIndex.remove(Integer.valueOf(remove.getBrokerId()));
            }
        }
        ConcurrentHashSet<String> concurrentHashSet3 = this.brokerId2TopicCacheIndex.get(Integer.valueOf(remove.getBrokerId()));
        if (concurrentHashSet3 != null) {
            concurrentHashSet3.remove(remove.getTopicName());
            if (concurrentHashSet3.isEmpty()) {
                this.brokerId2TopicCacheIndex.remove(Integer.valueOf(remove.getBrokerId()));
            }
        }
    }

    private Set<String> getMatchedRecords(Set<String> set, Set<Integer> set2) {
        HashSet<String> hashSet = null;
        HashSet hashSet2 = null;
        HashSet hashSet3 = null;
        if (set != null && !set.isEmpty()) {
            hashSet = new HashSet();
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                ConcurrentHashSet<String> concurrentHashSet = this.topicNameCacheIndex.get(it.next());
                if (concurrentHashSet != null && !concurrentHashSet.isEmpty()) {
                    hashSet.addAll(concurrentHashSet);
                }
            }
            if (hashSet.isEmpty()) {
                return Collections.emptySet();
            }
        }
        if (set2 != null && !set2.isEmpty()) {
            hashSet2 = new HashSet();
            Iterator<Integer> it2 = set2.iterator();
            while (it2.hasNext()) {
                ConcurrentHashSet<String> concurrentHashSet2 = this.brokerIdCacheIndex.get(it2.next());
                if (concurrentHashSet2 != null && !concurrentHashSet2.isEmpty()) {
                    hashSet2.addAll(concurrentHashSet2);
                }
            }
            if (hashSet2.isEmpty()) {
                return Collections.emptySet();
            }
        }
        if (hashSet != null || hashSet2 != null) {
            if (hashSet == null) {
                hashSet3 = new HashSet(hashSet2);
            } else if (hashSet2 == null) {
                hashSet3 = new HashSet(hashSet);
            } else {
                hashSet3 = new HashSet();
                for (String str : hashSet) {
                    if (hashSet2.contains(str)) {
                        hashSet3.add(str);
                    }
                }
            }
        }
        return hashSet3;
    }

    private void addOrUpdCacheRecord(TopicDeployEntity topicDeployEntity) {
        this.topicConfCache.put(topicDeployEntity.getRecordKey(), topicDeployEntity);
        ConcurrentHashSet<String> concurrentHashSet = this.topicNameCacheIndex.get(topicDeployEntity.getTopicName());
        if (concurrentHashSet == null) {
            ConcurrentHashSet<String> concurrentHashSet2 = new ConcurrentHashSet<>();
            concurrentHashSet = this.topicNameCacheIndex.putIfAbsent(topicDeployEntity.getTopicName(), concurrentHashSet2);
            if (concurrentHashSet == null) {
                concurrentHashSet = concurrentHashSet2;
            }
        }
        concurrentHashSet.add(topicDeployEntity.getRecordKey());
        ConcurrentHashSet<String> concurrentHashSet3 = this.brokerIdCacheIndex.get(Integer.valueOf(topicDeployEntity.getBrokerId()));
        if (concurrentHashSet3 == null) {
            ConcurrentHashSet<String> concurrentHashSet4 = new ConcurrentHashSet<>();
            concurrentHashSet3 = this.brokerIdCacheIndex.putIfAbsent(Integer.valueOf(topicDeployEntity.getBrokerId()), concurrentHashSet4);
            if (concurrentHashSet3 == null) {
                concurrentHashSet3 = concurrentHashSet4;
            }
        }
        concurrentHashSet3.add(topicDeployEntity.getRecordKey());
        ConcurrentHashSet<String> concurrentHashSet5 = this.brokerId2TopicCacheIndex.get(Integer.valueOf(topicDeployEntity.getBrokerId()));
        if (concurrentHashSet5 == null) {
            ConcurrentHashSet<String> concurrentHashSet6 = new ConcurrentHashSet<>();
            concurrentHashSet5 = this.brokerId2TopicCacheIndex.putIfAbsent(Integer.valueOf(topicDeployEntity.getBrokerId()), concurrentHashSet6);
            if (concurrentHashSet5 == null) {
                concurrentHashSet5 = concurrentHashSet6;
            }
        }
        concurrentHashSet5.add(topicDeployEntity.getTopicName());
    }

    private void clearCacheData() {
        this.topicNameCacheIndex.clear();
        this.brokerIdCacheIndex.clear();
        this.brokerId2TopicCacheIndex.clear();
        this.topicConfCache.clear();
    }
}
