package org.apache.tubemq.server.broker.offset;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tubemq.corebase.daemon.AbstractDaemonService;
import org.apache.tubemq.corebase.utils.MixedUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.corebase.utils.Tuple3;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.common.TStatusConstants;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorage;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
import org.apache.tubemq.server.common.offsetstorage.ZkOffsetStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tubemq/server/broker/offset/DefaultOffsetManager.class */
public class DefaultOffsetManager extends AbstractDaemonService implements OffsetService {
    private static final Logger logger = LoggerFactory.getLogger(DefaultOffsetManager.class);
    private final BrokerConfig brokerConfig;
    private final OffsetStorage zkOffsetStorage;
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, OffsetStorageInfo>> cfmOffsetMap;
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> tmpOffsetMap;

    public DefaultOffsetManager(BrokerConfig brokerConfig) {
        super("[Offset Manager]", brokerConfig.getZkConfig().getZkCommitPeriodMs());
        this.cfmOffsetMap = new ConcurrentHashMap<>();
        this.tmpOffsetMap = new ConcurrentHashMap<>();
        this.brokerConfig = brokerConfig;
        this.zkOffsetStorage = new ZkOffsetStorage(brokerConfig.getZkConfig(), true, brokerConfig.getBrokerId());
        super.start();
    }

    protected void loopProcess(long j) {
        while (!super.isStopped()) {
            try {
                Thread.sleep(j);
                commitCfmOffsets(false);
            } catch (InterruptedException e) {
                logger.warn("[Offset Manager] Daemon commit thread has been interrupted");
                return;
            } catch (Throwable th) {
                logger.error("[Offset Manager] Daemon commit thread throw error ", th);
            }
        }
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public void close(long j) {
        if (super.stop()) {
            return;
        }
        logger.info("[Offset Manager] begin reserve temporary Offset.....");
        commitTmpOffsets();
        logger.info("[Offset Manager] begin reserve final Offset.....");
        commitCfmOffsets(true);
        this.zkOffsetStorage.close();
        logger.info("[Offset Manager] Offset Manager service stopped!");
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public OffsetStorageInfo loadOffset(MessageStore messageStore, String str, String str2, int i, int i2, long j, StringBuilder sb) {
        long indexMaxOffset = messageStore.getIndexMaxOffset();
        long indexMinOffset = messageStore.getIndexMinOffset();
        long j2 = i2 == 0 ? indexMinOffset : indexMaxOffset;
        String offsetCacheKey = getOffsetCacheKey(str2, i);
        OffsetStorageInfo loadOrCreateOffset = loadOrCreateOffset(str, str2, i, offsetCacheKey, j2);
        getAndResetTmpOffset(str, offsetCacheKey);
        long offset = loadOrCreateOffset.getOffset();
        boolean isFirstCreate = loadOrCreateOffset.isFirstCreate();
        if (j >= 0 || i2 == 2) {
            long j3 = indexMaxOffset;
            if (i2 != 2) {
                j3 = MixedUtils.mid(j, indexMinOffset, indexMaxOffset);
            }
            loadOrCreateOffset.getAndSetOffset(j3);
        }
        sb.append("[Offset Manager]");
        switch (i2) {
            case 1:
                sb.append(" Consume From Max Offset");
                break;
            case TStatusConstants.STATUS_TOPIC_SOFT_REMOVE /* 2 */:
                sb.append(" Consume From Max Offset Always");
                break;
            default:
                sb.append(" Normal Offset");
                break;
        }
        if (!isFirstCreate) {
            sb.append(",Continue");
        }
        logger.info(sb.append(",loaded offset=").append(offset).append(",required offset=").append(j).append(",current offset=").append(loadOrCreateOffset.getOffset()).append(",maxOffset=").append(indexMaxOffset).append(",offset delta=").append(indexMaxOffset - loadOrCreateOffset.getOffset()).append(",group=").append(str).append(",topic=").append(str2).append(",partitionId=").append(i).toString());
        sb.delete(0, sb.length());
        return loadOrCreateOffset;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public long getOffset(MessageStore messageStore, String str, String str2, int i, boolean z, boolean z2, StringBuilder sb) {
        String offsetCacheKey = getOffsetCacheKey(str2, i);
        long offset = loadOrCreateOffset(str, str2, i, offsetCacheKey, 0L).getOffset();
        if (z) {
            offset += getTmpOffset(str, str2, i);
        } else if (z2) {
            offset = commitOffset(str, str2, i, true);
        }
        long indexMaxOffset = messageStore.getIndexMaxOffset();
        long indexMinOffset = messageStore.getIndexMinOffset();
        if (offset < indexMaxOffset) {
            if (offset < indexMinOffset) {
                logger.warn(sb.append("[Offset Manager] Offset is lower than current min offset, reset! requestOffset=").append(offset).append(",minOffset=").append(indexMinOffset).append(",group=").append(str).append(",topic=").append(str2).append(",partitionId=").append(i).toString());
                sb.delete(0, sb.length());
                setTmpOffset(str, offsetCacheKey, indexMinOffset - offset);
                offset = commitOffset(str, str2, i, true);
            }
            return offset;
        }
        if (offset > indexMaxOffset && this.brokerConfig.isUpdateConsumerOffsets()) {
            logger.warn(sb.append("[Offset Manager] Offset is bigger than current max offset, reset! requestOffset=").append(offset).append(",maxOffset=").append(indexMaxOffset).append(",group=").append(str).append(",topic=").append(str2).append(",partitionId=").append(i).toString());
            sb.delete(0, sb.length());
            setTmpOffset(str, offsetCacheKey, indexMaxOffset - offset);
            if (!z) {
                offset = commitOffset(str, str2, i, true);
            }
        }
        return -offset;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public long getOffset(String str, String str2, int i) {
        return loadOrCreateOffset(str, str2, i, getOffsetCacheKey(str2, i), 0L).getOffset();
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public void bookOffset(String str, String str2, int i, int i2, boolean z, boolean z2, StringBuilder sb) {
        if (i2 == 0) {
            return;
        }
        String offsetCacheKey = getOffsetCacheKey(str2, i);
        if (z) {
            setTmpOffset(str, offsetCacheKey, i2 + getTmpOffset(str, str2, i));
        } else {
            setTmpOffset(str, offsetCacheKey, i2);
            if (z2) {
                commitOffset(str, str2, i, true);
            }
        }
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public long commitOffset(String str, String str2, int i, boolean z) {
        String offsetCacheKey = getOffsetCacheKey(str2, i);
        long andResetTmpOffset = getAndResetTmpOffset(str, offsetCacheKey);
        if (!z) {
            andResetTmpOffset = 0;
        }
        OffsetStorageInfo loadOrCreateOffset = loadOrCreateOffset(str, str2, i, offsetCacheKey, 0L);
        if (andResetTmpOffset == 0 && !loadOrCreateOffset.isFirstCreate()) {
            return loadOrCreateOffset.getOffset();
        }
        long addAndGetOffset = loadOrCreateOffset.addAndGetOffset(andResetTmpOffset);
        if (logger.isDebugEnabled()) {
            logger.debug(new StringBuilder(512).append("[Offset Manager] Update offset finished, offset=").append(addAndGetOffset).append(",group=").append(str).append(",topic=").append(str2).append(",partitionId=").append(i).toString());
        }
        return addAndGetOffset;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public long resetOffset(MessageStore messageStore, String str, String str2, int i, long j, String str3) {
        long j2 = -1;
        if (messageStore != null) {
            long mid = MixedUtils.mid(j, messageStore.getIndexMinOffset(), messageStore.getIndexMaxOffset());
            String offsetCacheKey = getOffsetCacheKey(str2, i);
            getAndResetTmpOffset(str, offsetCacheKey);
            OffsetStorageInfo loadOrCreateOffset = loadOrCreateOffset(str, str2, i, offsetCacheKey, 0L);
            j2 = loadOrCreateOffset.getAndSetOffset(mid);
            logger.info(new StringBuilder(512).append("[Offset Manager] Manual update offset by modifier=").append(str3).append(",reset offset=").append(mid).append(",old offset=").append(j2).append(",updated offset=").append(loadOrCreateOffset.getOffset()).append(",group=").append(str).append(",topic=").append(str2).append(",partitionId=").append(i).toString());
        }
        return j2;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public long getTmpOffset(String str, String str2, int i) {
        Long l;
        String offsetCacheKey = getOffsetCacheKey(str2, i);
        ConcurrentHashMap<String, Long> concurrentHashMap = this.tmpOffsetMap.get(str);
        if (concurrentHashMap == null || (l = concurrentHashMap.get(offsetCacheKey)) == null) {
            return 0L;
        }
        return l.longValue() - (l.longValue() % 28);
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public Set<String> getBookedGroups() {
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.cfmOffsetMap.keySet());
        hashSet.addAll(this.zkOffsetStorage.queryZkAllGroupTopicInfos().keySet());
        return hashSet;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public Set<String> getInMemoryGroups() {
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.cfmOffsetMap.keySet());
        return hashSet;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public Set<String> getUnusedGroupInfo() {
        HashSet hashSet = new HashSet();
        for (String str : this.zkOffsetStorage.queryZkAllGroupTopicInfos().keySet()) {
            if (!this.cfmOffsetMap.containsKey(str)) {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v25, types: [java.util.Set] */
    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public Set<String> getGroupSubInfo(String str) {
        HashSet hashSet = new HashSet();
        ConcurrentHashMap<String, OffsetStorageInfo> concurrentHashMap = this.cfmOffsetMap.get(str);
        if (concurrentHashMap == null) {
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(str);
            hashSet = (Set) this.zkOffsetStorage.queryZKGroupTopicInfo(arrayList).get(str);
        } else {
            Iterator<OffsetStorageInfo> it = concurrentHashMap.values().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().getTopic());
            }
        }
        return hashSet;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(String str, Map<String, Set<Integer>> map) {
        HashMap hashMap = new HashMap();
        ConcurrentHashMap<String, OffsetStorageInfo> concurrentHashMap = this.cfmOffsetMap.get(str);
        if (concurrentHashMap == null) {
            for (Map.Entry<String, Set<Integer>> entry : map.entrySet()) {
                if (entry != null && entry.getKey() != null && entry.getValue() != null) {
                    Map<Integer, Long> queryGroupOffsetInfo = this.zkOffsetStorage.queryGroupOffsetInfo(str, entry.getKey(), entry.getValue());
                    HashMap hashMap2 = new HashMap();
                    for (Map.Entry<Integer, Long> entry2 : queryGroupOffsetInfo.entrySet()) {
                        if (entry2 != null && entry2.getKey() != null && entry2.getValue() != null) {
                            hashMap2.put(entry2.getKey(), new Tuple2(entry2.getValue(), 0L));
                        }
                    }
                    if (!hashMap2.isEmpty()) {
                        hashMap.put(entry.getKey(), hashMap2);
                    }
                }
            }
        } else {
            ConcurrentHashMap<String, Long> concurrentHashMap2 = this.tmpOffsetMap.get(str);
            for (Map.Entry<String, Set<Integer>> entry3 : map.entrySet()) {
                HashMap hashMap3 = new HashMap();
                for (Integer num : entry3.getValue()) {
                    String offsetCacheKey = getOffsetCacheKey(entry3.getKey(), num.intValue());
                    OffsetStorageInfo offsetStorageInfo = concurrentHashMap.get(offsetCacheKey);
                    Long l = concurrentHashMap2.get(offsetCacheKey);
                    if (l == null) {
                        l = 0L;
                    }
                    if (offsetStorageInfo != null) {
                        hashMap3.put(num, new Tuple2(Long.valueOf(offsetStorageInfo.getOffset()), l));
                    }
                }
                if (!hashMap3.isEmpty()) {
                    hashMap.put(entry3.getKey(), hashMap3);
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public boolean modifyGroupOffset(Set<String> set, List<Tuple3<String, Integer, Long>> list, String str) {
        boolean z = false;
        StringBuilder sb = new StringBuilder(512);
        for (String str2 : set) {
            for (Tuple3<String, Integer, Long> tuple3 : list) {
                if (tuple3 != null && tuple3.getF0() != null && tuple3.getF1() != null && tuple3.getF2() != null) {
                    String offsetCacheKey = getOffsetCacheKey((String) tuple3.getF0(), ((Integer) tuple3.getF1()).intValue());
                    getAndResetTmpOffset(str2, offsetCacheKey);
                    OffsetStorageInfo loadOrCreateOffset = loadOrCreateOffset(str2, (String) tuple3.getF0(), ((Integer) tuple3.getF1()).intValue(), offsetCacheKey, 0L);
                    z = true;
                    logger.info(sb.append("[Offset Manager] Update offset by modifier=").append(str).append(",reset offset=").append(tuple3.getF2()).append(",old offset=").append(loadOrCreateOffset.getAndSetOffset(((Long) tuple3.getF2()).longValue())).append(",updated offset=").append(loadOrCreateOffset.getOffset()).append(",group=").append(str2).append(",topic-partId=").append(offsetCacheKey).toString());
                    sb.delete(0, sb.length());
                }
            }
        }
        return z;
    }

    @Override // org.apache.tubemq.server.broker.offset.OffsetService
    public void deleteGroupOffset(boolean z, Map<String, Map<String, Set<Integer>>> map, String str) {
        String sb;
        StringBuilder sb2 = new StringBuilder(512);
        for (Map.Entry<String, Map<String, Set<Integer>>> entry : map.entrySet()) {
            if (entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty()) {
                rmvOffset(entry.getKey(), entry.getValue());
            }
        }
        if (z) {
            sb = sb2.append("[Offset Manager] delete offset from memory by modifier=").append(str).toString();
        } else {
            this.zkOffsetStorage.deleteGroupOffsetInfo(map);
            sb = sb2.append("[Offset Manager] delete offset from memory and zk by modifier=").append(str).toString();
        }
        sb2.delete(0, sb2.length());
        for (Map.Entry<String, Map<String, Set<Integer>>> entry2 : map.entrySet()) {
            if (entry2.getKey() != null && entry2.getValue() != null && !entry2.getValue().isEmpty()) {
                logger.info(sb2.append(sb).append(",group=").append(entry2.getKey()).append(",topic-partId-map=").append(entry2.getValue()).toString());
                sb2.delete(0, sb2.length());
            }
        }
    }

    private long setTmpOffset(String str, String str2, long j) {
        long j2 = j - (j % 28);
        ConcurrentHashMap<String, Long> concurrentHashMap = this.tmpOffsetMap.get(str);
        if (concurrentHashMap == null) {
            ConcurrentHashMap<String, Long> concurrentHashMap2 = new ConcurrentHashMap<>();
            concurrentHashMap = this.tmpOffsetMap.putIfAbsent(str, concurrentHashMap2);
            if (concurrentHashMap == null) {
                concurrentHashMap = concurrentHashMap2;
            }
        }
        Long put = concurrentHashMap.put(str2, Long.valueOf(j2));
        if (put == null) {
            return 0L;
        }
        return put.longValue() - (put.longValue() % 28);
    }

    private long getAndResetTmpOffset(String str, String str2) {
        ConcurrentHashMap<String, Long> concurrentHashMap = this.tmpOffsetMap.get(str);
        if (concurrentHashMap == null) {
            ConcurrentHashMap<String, Long> concurrentHashMap2 = new ConcurrentHashMap<>();
            concurrentHashMap = this.tmpOffsetMap.putIfAbsent(str, concurrentHashMap2);
            if (concurrentHashMap == null) {
                concurrentHashMap = concurrentHashMap2;
            }
        }
        Long put = concurrentHashMap.put(str2, 0L);
        if (put == null) {
            return 0L;
        }
        return put.longValue() - (put.longValue() % 28);
    }

    private void commitTmpOffsets() {
        for (Map.Entry<String, ConcurrentHashMap<String, Long>> entry : this.tmpOffsetMap.entrySet()) {
            if (!TStringUtils.isBlank(entry.getKey()) && entry.getValue() != null && !entry.getValue().isEmpty()) {
                for (Map.Entry<String, Long> entry2 : entry.getValue().entrySet()) {
                    if (!TStringUtils.isBlank(entry2.getKey())) {
                        String[] split = entry2.getKey().split("-");
                        try {
                            commitOffset(entry.getKey(), split[0], Integer.parseInt(split[1]), true);
                        } catch (Exception e) {
                            logger.warn("[Offset Manager] Commit tmp offset error!", e);
                        }
                    }
                }
            }
        }
    }

    private void commitCfmOffsets(boolean z) {
        for (Map.Entry<String, ConcurrentHashMap<String, OffsetStorageInfo>> entry : this.cfmOffsetMap.entrySet()) {
            if (!TStringUtils.isBlank(entry.getKey()) && entry.getValue() != null && !entry.getValue().isEmpty()) {
                this.zkOffsetStorage.commitOffset(entry.getKey(), entry.getValue().values(), z);
            }
        }
    }

    private OffsetStorageInfo loadOrCreateOffset(String str, String str2, int i, String str3, long j) {
        ConcurrentHashMap<String, OffsetStorageInfo> concurrentHashMap = this.cfmOffsetMap.get(str);
        if (concurrentHashMap == null) {
            ConcurrentHashMap<String, OffsetStorageInfo> concurrentHashMap2 = new ConcurrentHashMap<>();
            concurrentHashMap = this.cfmOffsetMap.putIfAbsent(str, concurrentHashMap2);
            if (concurrentHashMap == null) {
                concurrentHashMap = concurrentHashMap2;
            }
        }
        OffsetStorageInfo offsetStorageInfo = concurrentHashMap.get(str3);
        if (offsetStorageInfo == null) {
            OffsetStorageInfo loadOffset = this.zkOffsetStorage.loadOffset(str, str2, i);
            if (loadOffset == null) {
                loadOffset = new OffsetStorageInfo(str2, this.brokerConfig.getBrokerId(), i, j, 0L);
            }
            offsetStorageInfo = concurrentHashMap.putIfAbsent(str3, loadOffset);
            if (offsetStorageInfo == null) {
                offsetStorageInfo = loadOffset;
            }
        }
        return offsetStorageInfo;
    }

    private void rmvOffset(String str, Map<String, Set<Integer>> map) {
        if (str == null || map == null || map.isEmpty()) {
            return;
        }
        ConcurrentHashMap<String, OffsetStorageInfo> concurrentHashMap = this.cfmOffsetMap.get(str);
        if (concurrentHashMap != null) {
            for (Map.Entry<String, Set<Integer>> entry : map.entrySet()) {
                if (entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty()) {
                    Iterator<Integer> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        concurrentHashMap.remove(getOffsetCacheKey(entry.getKey(), it.next().intValue()));
                    }
                }
            }
            if (concurrentHashMap.isEmpty()) {
                this.cfmOffsetMap.remove(str);
            }
        }
        ConcurrentHashMap<String, Long> concurrentHashMap2 = this.tmpOffsetMap.get(str);
        if (concurrentHashMap2 != null) {
            for (Map.Entry<String, Set<Integer>> entry2 : map.entrySet()) {
                if (entry2.getKey() != null && entry2.getValue() != null && !entry2.getValue().isEmpty()) {
                    Iterator<Integer> it2 = entry2.getValue().iterator();
                    while (it2.hasNext()) {
                        concurrentHashMap2.remove(getOffsetCacheKey(entry2.getKey(), it2.next().intValue()));
                    }
                }
            }
            if (concurrentHashMap2.isEmpty()) {
                this.tmpOffsetMap.remove(str);
            }
        }
    }

    private String getOffsetCacheKey(String str, int i) {
        return new StringBuilder(256).append(str).append("-").append(i).toString();
    }
}
