package org.apache.inlong.tubemq.server.broker.stats;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.corebase.daemon.AbstractDaemonService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/tubemq/server/broker/stats/GroupCountService.class */
public class GroupCountService extends AbstractDaemonService implements CountService {
    private final Logger logger;
    private final String cntHdr;
    private final CountSet[] countSets;
    private AtomicInteger index;

    /* loaded from: input_file:org/apache/inlong/tubemq/server/broker/stats/GroupCountService$CountSet.class */
    private static class CountSet {
        public AtomicLong refCnt;
        public ConcurrentHashMap<String, CountItem> counterItem;

        private CountSet() {
            this.refCnt = new AtomicLong(0L);
            this.counterItem = new ConcurrentHashMap<>();
        }
    }

    public GroupCountService(String str, String str2, long j) {
        super(str, j);
        this.countSets = new CountSet[2];
        this.index = new AtomicInteger(0);
        this.cntHdr = str2;
        if (str == null) {
            this.logger = LoggerFactory.getLogger(GroupCountService.class);
        } else {
            this.logger = LoggerFactory.getLogger(str);
        }
        this.countSets[0] = new CountSet();
        this.countSets[1] = new CountSet();
        super.start();
    }

    protected void loopProcess(long j) {
        while (!super.isStopped()) {
            try {
                Thread.sleep(j);
                int i = this.index.get();
                if (this.index.compareAndSet(i, (i + 1) % 2)) {
                    AtomicLong atomicLong = this.countSets[i].refCnt;
                    do {
                        try {
                            Thread.sleep(10L);
                        } catch (InterruptedException e) {
                            return;
                        }
                    } while (atomicLong.get() > 0);
                    ConcurrentHashMap<String, CountItem> concurrentHashMap = this.countSets[i].counterItem;
                    if (concurrentHashMap != null) {
                        for (Map.Entry<String, CountItem> entry : concurrentHashMap.entrySet()) {
                            this.logger.info("{}#{}#{}#{}", new Object[]{this.cntHdr, entry.getKey(), Long.valueOf(entry.getValue().getMsgCount()), Long.valueOf(entry.getValue().getMsgSize())});
                        }
                        concurrentHashMap.clear();
                    }
                }
            } catch (InterruptedException e2) {
                return;
            } catch (Throwable th) {
            }
        }
    }

    @Override // org.apache.inlong.tubemq.server.broker.stats.CountService
    public void close(long j) {
        if (super.stop()) {
            return;
        }
        int i = this.index.get();
        for (int i2 = 0; i2 < this.countSets.length; i2++) {
            i++;
            ConcurrentHashMap<String, CountItem> concurrentHashMap = this.countSets[i % 2].counterItem;
            if (concurrentHashMap != null) {
                for (Map.Entry<String, CountItem> entry : concurrentHashMap.entrySet()) {
                    this.logger.info("{}#{}#{}#{}", new Object[]{this.cntHdr, entry.getKey(), Long.valueOf(entry.getValue().getMsgCount()), Long.valueOf(entry.getValue().getMsgSize())});
                }
                concurrentHashMap.clear();
            }
        }
    }

    @Override // org.apache.inlong.tubemq.server.broker.stats.CountService
    public void add(Map<String, CountItem> map) {
        CountSet countSet = this.countSets[this.index.get()];
        countSet.refCnt.incrementAndGet();
        ConcurrentHashMap<String, CountItem> concurrentHashMap = countSet.counterItem;
        for (Map.Entry<String, CountItem> entry : map.entrySet()) {
            CountItem countItem = concurrentHashMap.get(entry.getKey());
            if (countItem == null) {
                CountItem countItem2 = new CountItem(0L, 0L);
                countItem = concurrentHashMap.putIfAbsent(entry.getKey(), countItem2);
                if (countItem == null) {
                    countItem = countItem2;
                }
            }
            countItem.appendMsg(entry.getValue().getMsgCount(), entry.getValue().getMsgSize());
        }
        countSet.refCnt.decrementAndGet();
    }

    @Override // org.apache.inlong.tubemq.server.broker.stats.CountService
    public void add(String str, Long l, int i) {
        CountSet countSet = this.countSets[this.index.get()];
        countSet.refCnt.incrementAndGet();
        CountItem countItem = countSet.counterItem.get(str);
        if (countItem == null) {
            CountItem countItem2 = new CountItem(0L, 0L);
            countItem = countSet.counterItem.putIfAbsent(str, countItem2);
            if (countItem == null) {
                countItem = countItem2;
            }
        }
        countItem.appendMsg(l.longValue(), i);
        countSet.refCnt.decrementAndGet();
    }
}
