package org.apache.inlong.tubemq.server.master.metamanage.metastore.impl.bdbimpl;

import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.StoreConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.inlong.tubemq.corebase.utils.KeyBuilderUtils;
import org.apache.inlong.tubemq.server.common.exception.LoadMetaException;
import org.apache.inlong.tubemq.server.common.utils.ProcessResult;
import org.apache.inlong.tubemq.server.master.bdbstore.bdbentitys.BdbGroupFilterCondEntity;
import org.apache.inlong.tubemq.server.master.metamanage.DataOpErrCode;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.entity.GroupConsumeCtrlEntity;
import org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/server/master/metamanage/metastore/impl/bdbimpl/BdbGroupConsumeCtrlMapperImpl.class */
public class BdbGroupConsumeCtrlMapperImpl implements GroupConsumeCtrlMapper {
    private static final Logger logger = LoggerFactory.getLogger(BdbGroupConsumeCtrlMapperImpl.class);
    private EntityStore groupConsumeStore;
    private PrimaryIndex<String, BdbGroupFilterCondEntity> groupConsumeIndex;
    private ConcurrentHashMap<String, GroupConsumeCtrlEntity> grpConsumeCtrlCache = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ConcurrentHashSet<String>> grpConsumeCtrlTopicCache = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ConcurrentHashSet<String>> grpConsumeCtrlGroupCache = new ConcurrentHashMap<>();

    public BdbGroupConsumeCtrlMapperImpl(ReplicatedEnvironment replicatedEnvironment, StoreConfig storeConfig) {
        this.groupConsumeStore = new EntityStore(replicatedEnvironment, TBDBStoreTables.BDB_GROUP_FILTER_COND_STORE_NAME, storeConfig);
        this.groupConsumeIndex = this.groupConsumeStore.getPrimaryIndex(String.class, BdbGroupFilterCondEntity.class);
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.AbstractMapper
    public void close() {
        clearCacheData();
        if (this.groupConsumeStore != null) {
            try {
                this.groupConsumeStore.close();
                this.groupConsumeStore = null;
            } catch (Throwable th) {
                logger.error("[BDB Impl] close consume configure failure ", th);
            }
        }
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.AbstractMapper
    public void loadConfig() throws LoadMetaException {
        long j = 0;
        EntityCursor<BdbGroupFilterCondEntity> entityCursor = null;
        logger.info("[BDB Impl] load consume configure start...");
        try {
            try {
                clearCacheData();
                entityCursor = this.groupConsumeIndex.entities();
                for (BdbGroupFilterCondEntity bdbGroupFilterCondEntity : entityCursor) {
                    if (bdbGroupFilterCondEntity == null) {
                        logger.warn("[BDB Impl] found Null data while loading consume configure!");
                    } else {
                        addOrUpdCacheRecord(new GroupConsumeCtrlEntity(bdbGroupFilterCondEntity));
                        j++;
                    }
                }
                logger.info("[BDB Impl] total consume configure records are {}", Long.valueOf(j));
                if (entityCursor != null) {
                    entityCursor.close();
                }
                logger.info("[BDB Impl] load consume configure successfully...");
            } catch (Exception e) {
                logger.error("[BDB Impl] load filter configure failure ", e);
                throw new LoadMetaException(e.getMessage());
            }
        } catch (Throwable th) {
            if (entityCursor != null) {
                entityCursor.close();
            }
            throw th;
        }
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public boolean addGroupConsumeCtrlConf(GroupConsumeCtrlEntity groupConsumeCtrlEntity, ProcessResult processResult) {
        if (this.grpConsumeCtrlCache.get(groupConsumeCtrlEntity.getRecordKey()) != null) {
            processResult.setFailResult(DataOpErrCode.DERR_EXISTED.getCode(), new StringBuilder(512).append("The group consume ").append(groupConsumeCtrlEntity.getRecordKey()).append("'s configure already exists, please delete it first!").toString());
            return processResult.isSuccess();
        }
        if (putGroupConsumeCtrlConfig2Bdb(groupConsumeCtrlEntity, processResult)) {
            addOrUpdCacheRecord(groupConsumeCtrlEntity);
        }
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public boolean updGroupConsumeCtrlConf(GroupConsumeCtrlEntity groupConsumeCtrlEntity, ProcessResult processResult) {
        GroupConsumeCtrlEntity groupConsumeCtrlEntity2 = this.grpConsumeCtrlCache.get(groupConsumeCtrlEntity.getRecordKey());
        if (groupConsumeCtrlEntity2 == null) {
            processResult.setFailResult(DataOpErrCode.DERR_NOT_EXIST.getCode(), new StringBuilder(512).append("The group consume ").append(groupConsumeCtrlEntity.getRecordKey()).append("'s configure is not exists, please add record first!").toString());
            return processResult.isSuccess();
        }
        if (groupConsumeCtrlEntity2.equals(groupConsumeCtrlEntity)) {
            processResult.setFailResult(DataOpErrCode.DERR_UNCHANGED.getCode(), new StringBuilder(512).append("The group consume ").append(groupConsumeCtrlEntity.getRecordKey()).append("'s configure have not changed, please delete it first!").toString());
            return processResult.isSuccess();
        }
        if (putGroupConsumeCtrlConfig2Bdb(groupConsumeCtrlEntity, processResult)) {
            addOrUpdCacheRecord(groupConsumeCtrlEntity);
            processResult.setRetData(groupConsumeCtrlEntity2);
        }
        return processResult.isSuccess();
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public boolean isTopicNameInUsed(String str) {
        ConcurrentHashSet<String> concurrentHashSet = this.grpConsumeCtrlTopicCache.get(str);
        return (concurrentHashSet == null || concurrentHashSet.isEmpty()) ? false : true;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public boolean hasGroupConsumeCtrlConf(String str) {
        ConcurrentHashSet<String> concurrentHashSet = this.grpConsumeCtrlGroupCache.get(str);
        return (concurrentHashSet == null || concurrentHashSet.isEmpty()) ? false : true;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public boolean delGroupConsumeCtrlConf(String str, ProcessResult processResult) {
        GroupConsumeCtrlEntity groupConsumeCtrlEntity = this.grpConsumeCtrlCache.get(str);
        if (groupConsumeCtrlEntity == null) {
            processResult.setSuccResult(null);
            return true;
        }
        delGroupConsumeCtrlConfigFromBdb(str);
        delCacheRecord(str);
        processResult.setSuccResult(groupConsumeCtrlEntity);
        return true;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public boolean delGroupConsumeCtrlConf(String str, String str2, ProcessResult processResult) {
        ConcurrentHashSet<String> concurrentHashSet = new ConcurrentHashSet<>();
        if (str == null) {
            if (str2 == null) {
                processResult.setSuccResult(null);
                return true;
            }
            concurrentHashSet = this.grpConsumeCtrlTopicCache.get(str2);
        } else if (str2 == null) {
            concurrentHashSet = this.grpConsumeCtrlGroupCache.get(str);
        } else {
            concurrentHashSet.add(KeyBuilderUtils.buildGroupTopicRecKey(str, str2));
        }
        if (concurrentHashSet == null || concurrentHashSet.isEmpty()) {
            processResult.setSuccResult(null);
            return true;
        }
        Iterator it = concurrentHashSet.iterator();
        while (it.hasNext()) {
            if (!delGroupConsumeCtrlConf((String) it.next(), processResult)) {
                return processResult.isSuccess();
            }
            processResult.clear();
        }
        processResult.setSuccResult(null);
        return true;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public GroupConsumeCtrlEntity getGroupConsumeCtrlConfByRecKey(String str) {
        return this.grpConsumeCtrlCache.get(str);
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public List<GroupConsumeCtrlEntity> getConsumeCtrlByTopicName(String str) {
        GroupConsumeCtrlEntity groupConsumeCtrlEntity;
        ConcurrentHashSet<String> concurrentHashSet = this.grpConsumeCtrlTopicCache.get(str);
        if (concurrentHashSet == null || concurrentHashSet.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = concurrentHashSet.iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            if (str2 != null && (groupConsumeCtrlEntity = this.grpConsumeCtrlCache.get(str2)) != null) {
                arrayList.add(groupConsumeCtrlEntity);
            }
        }
        return arrayList;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public List<GroupConsumeCtrlEntity> getConsumeCtrlByGroupName(String str) {
        ConcurrentHashSet<String> concurrentHashSet = this.grpConsumeCtrlGroupCache.get(str);
        if (concurrentHashSet == null || concurrentHashSet.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = concurrentHashSet.iterator();
        while (it.hasNext()) {
            GroupConsumeCtrlEntity groupConsumeCtrlEntity = this.grpConsumeCtrlCache.get((String) it.next());
            if (groupConsumeCtrlEntity != null) {
                arrayList.add(groupConsumeCtrlEntity);
            }
        }
        return arrayList;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public GroupConsumeCtrlEntity getConsumeCtrlByGroupAndTopic(String str, String str2) {
        return this.grpConsumeCtrlCache.get(KeyBuilderUtils.buildGroupTopicRecKey(str, str2));
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public Map<String, List<GroupConsumeCtrlEntity>> getConsumeCtrlInfoMap(Set<String> set, Set<String> set2, GroupConsumeCtrlEntity groupConsumeCtrlEntity) {
        HashMap hashMap = new HashMap();
        Set<String> matchedRecords = getMatchedRecords(set, set2);
        if (matchedRecords == null) {
            for (GroupConsumeCtrlEntity groupConsumeCtrlEntity2 : this.grpConsumeCtrlCache.values()) {
                if (groupConsumeCtrlEntity2 != null && (groupConsumeCtrlEntity == null || groupConsumeCtrlEntity2.isMatched(groupConsumeCtrlEntity))) {
                    ((List) hashMap.computeIfAbsent(groupConsumeCtrlEntity2.getGroupName(), str -> {
                        return new ArrayList();
                    })).add(groupConsumeCtrlEntity2);
                }
            }
        } else {
            Iterator<String> it = matchedRecords.iterator();
            while (it.hasNext()) {
                GroupConsumeCtrlEntity groupConsumeCtrlEntity3 = this.grpConsumeCtrlCache.get(it.next());
                if (groupConsumeCtrlEntity3 != null && (groupConsumeCtrlEntity == null || groupConsumeCtrlEntity3.isMatched(groupConsumeCtrlEntity))) {
                    ((List) hashMap.computeIfAbsent(groupConsumeCtrlEntity3.getGroupName(), str2 -> {
                        return new ArrayList();
                    })).add(groupConsumeCtrlEntity3);
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public List<GroupConsumeCtrlEntity> getGroupConsumeCtrlConf(GroupConsumeCtrlEntity groupConsumeCtrlEntity) {
        ArrayList arrayList = new ArrayList();
        if (groupConsumeCtrlEntity == null) {
            arrayList.addAll(this.grpConsumeCtrlCache.values());
        } else {
            for (GroupConsumeCtrlEntity groupConsumeCtrlEntity2 : this.grpConsumeCtrlCache.values()) {
                if (groupConsumeCtrlEntity2 != null && groupConsumeCtrlEntity2.isMatched(groupConsumeCtrlEntity)) {
                    arrayList.add(groupConsumeCtrlEntity2);
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.inlong.tubemq.server.master.metamanage.metastore.dao.mapper.GroupConsumeCtrlMapper
    public Set<String> getMatchedRecords(Set<String> set, Set<String> set2) {
        HashSet<String> hashSet = null;
        HashSet hashSet2 = null;
        HashSet hashSet3 = null;
        if (set != null && !set.isEmpty()) {
            hashSet = new HashSet();
            Iterator<String> it = set.iterator();
            while (it.hasNext()) {
                ConcurrentHashSet<String> concurrentHashSet = this.grpConsumeCtrlGroupCache.get(it.next());
                if (concurrentHashSet != null && !concurrentHashSet.isEmpty()) {
                    hashSet.addAll(concurrentHashSet);
                }
            }
            if (hashSet.isEmpty()) {
                return Collections.emptySet();
            }
        }
        if (set2 != null && !set2.isEmpty()) {
            hashSet2 = new HashSet();
            Iterator<String> it2 = set2.iterator();
            while (it2.hasNext()) {
                ConcurrentHashSet<String> concurrentHashSet2 = this.grpConsumeCtrlTopicCache.get(it2.next());
                if (concurrentHashSet2 != null && !concurrentHashSet2.isEmpty()) {
                    hashSet2.addAll(concurrentHashSet2);
                }
            }
            if (hashSet2.isEmpty()) {
                return Collections.emptySet();
            }
        }
        if (hashSet != null || hashSet2 != null) {
            if (hashSet == null) {
                hashSet3 = new HashSet(hashSet2);
            } else if (hashSet2 == null) {
                hashSet3 = new HashSet(hashSet);
            } else {
                hashSet3 = new HashSet();
                for (String str : hashSet) {
                    if (hashSet2.contains(str)) {
                        hashSet3.add(str);
                    }
                }
            }
        }
        return hashSet3;
    }

    private boolean putGroupConsumeCtrlConfig2Bdb(GroupConsumeCtrlEntity groupConsumeCtrlEntity, ProcessResult processResult) {
        try {
            processResult.setSuccResult(null);
            return processResult.isSuccess();
        } catch (Throwable th) {
            logger.error("[BDB Impl] put consume configure failure ", th);
            processResult.setFailResult(DataOpErrCode.DERR_STORE_ABNORMAL.getCode(), new StringBuilder(512).append("Put filter configure failure: ").append(th.getMessage()).toString());
            return processResult.isSuccess();
        }
    }

    private boolean delGroupConsumeCtrlConfigFromBdb(String str) {
        try {
            this.groupConsumeIndex.delete(str);
            return true;
        } catch (Throwable th) {
            logger.error("[BDB Impl] delete consume configure failure ", th);
            return false;
        }
    }

    private void delCacheRecord(String str) {
        GroupConsumeCtrlEntity remove = this.grpConsumeCtrlCache.remove(str);
        if (remove == null) {
            return;
        }
        ConcurrentHashSet<String> concurrentHashSet = this.grpConsumeCtrlTopicCache.get(remove.getTopicName());
        if (concurrentHashSet != null) {
            concurrentHashSet.remove(str);
            if (concurrentHashSet.isEmpty()) {
                this.grpConsumeCtrlTopicCache.remove(remove.getTopicName());
            }
        }
        ConcurrentHashSet<String> concurrentHashSet2 = this.grpConsumeCtrlGroupCache.get(remove.getGroupName());
        if (concurrentHashSet2 != null) {
            concurrentHashSet2.remove(str);
            if (concurrentHashSet2.isEmpty()) {
                this.grpConsumeCtrlGroupCache.remove(remove.getGroupName());
            }
        }
    }

    private void addOrUpdCacheRecord(GroupConsumeCtrlEntity groupConsumeCtrlEntity) {
        this.grpConsumeCtrlCache.put(groupConsumeCtrlEntity.getRecordKey(), groupConsumeCtrlEntity);
        ConcurrentHashSet<String> concurrentHashSet = this.grpConsumeCtrlTopicCache.get(groupConsumeCtrlEntity.getTopicName());
        if (concurrentHashSet == null) {
            ConcurrentHashSet<String> concurrentHashSet2 = new ConcurrentHashSet<>();
            concurrentHashSet = this.grpConsumeCtrlTopicCache.putIfAbsent(groupConsumeCtrlEntity.getTopicName(), concurrentHashSet2);
            if (concurrentHashSet == null) {
                concurrentHashSet = concurrentHashSet2;
            }
        }
        concurrentHashSet.add(groupConsumeCtrlEntity.getRecordKey());
        ConcurrentHashSet<String> concurrentHashSet3 = this.grpConsumeCtrlGroupCache.get(groupConsumeCtrlEntity.getGroupName());
        if (concurrentHashSet3 == null) {
            ConcurrentHashSet<String> concurrentHashSet4 = new ConcurrentHashSet<>();
            concurrentHashSet3 = this.grpConsumeCtrlGroupCache.putIfAbsent(groupConsumeCtrlEntity.getGroupName(), concurrentHashSet4);
            if (concurrentHashSet3 == null) {
                concurrentHashSet3 = concurrentHashSet4;
            }
        }
        concurrentHashSet3.add(groupConsumeCtrlEntity.getRecordKey());
    }

    private void clearCacheData() {
        this.grpConsumeCtrlTopicCache.clear();
        this.grpConsumeCtrlGroupCache.clear();
        this.grpConsumeCtrlCache.clear();
    }
}
