package org.apache.inlong.dataproxy.dispatch;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Context;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/dispatch/DispatchManager.class */
public class DispatchManager {
    public static final Logger LOG = LoggerFactory.getLogger(DispatchManager.class);
    public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
    public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
    public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
    public static final long DEFAULT_DISPATCH_TIMEOUT = 2000;
    public static final long DEFAULT_DISPATCH_MAX_PACKCOUNT = 256;
    public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
    public static final long MINUTE_MS = 60000;
    private final long dispatchTimeout;
    private final long maxPackCount;
    private final long maxPackSize;
    private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
    private ConcurrentHashMap<String, DispatchProfile> profileCache = new ConcurrentHashMap<>();
    private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
    private AtomicLong inCounter = new AtomicLong(0);
    private AtomicLong outCounter = new AtomicLong(0);

    public DispatchManager(Context context, LinkedBlockingQueue<DispatchProfile> linkedBlockingQueue) {
        this.dispatchQueue = linkedBlockingQueue;
        this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT, Long.valueOf(DEFAULT_DISPATCH_TIMEOUT)).longValue();
        this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT, 256L).longValue();
        this.maxPackSize = context.getLong(KEY_DISPATCH_MAX_PACKSIZE, Long.valueOf(DEFAULT_DISPATCH_MAX_PACKSIZE)).longValue();
    }

    public void addEvent(ProxyEvent proxyEvent) {
        String uid = proxyEvent.getUid();
        long msgTime = proxyEvent.getMsgTime() - (proxyEvent.getMsgTime() % MINUTE_MS);
        String str = uid + "." + msgTime;
        DispatchProfile dispatchProfile = this.profileCache.get(str);
        if (dispatchProfile == null) {
            dispatchProfile = new DispatchProfile(uid, proxyEvent.getInlongGroupId(), proxyEvent.getInlongStreamId(), msgTime);
            this.profileCache.put(str, dispatchProfile);
        }
        if (!dispatchProfile.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize)) {
            DispatchProfile dispatchProfile2 = new DispatchProfile(uid, proxyEvent.getInlongGroupId(), proxyEvent.getInlongStreamId(), msgTime);
            this.dispatchQueue.offer(this.profileCache.put(str, dispatchProfile2));
            this.outCounter.addAndGet(dispatchProfile.getCount());
            dispatchProfile2.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize);
        }
        this.inCounter.incrementAndGet();
    }

    public void outputOvertimeData() {
        if (this.needOutputOvertimeData.getAndSet(false)) {
            LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", Integer.valueOf(this.profileCache.size()), Integer.valueOf(this.dispatchQueue.size()));
            long currentTimeMillis = System.currentTimeMillis() - this.dispatchTimeout;
            ArrayList arrayList = new ArrayList();
            long j = 0;
            for (Map.Entry<String, DispatchProfile> entry : this.profileCache.entrySet()) {
                DispatchProfile value = entry.getValue();
                j += value.getCount();
                if (value.isTimeout(currentTimeMillis)) {
                    arrayList.add(entry.getKey());
                }
            }
            arrayList.forEach(str -> {
                DispatchProfile remove = this.profileCache.remove(str);
                if (remove != null) {
                    this.dispatchQueue.offer(remove);
                    this.outCounter.addAndGet(remove.getCount());
                }
            });
            LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},inCounter:{},outCounter:{}", new Object[]{Integer.valueOf(this.profileCache.size()), Integer.valueOf(this.dispatchQueue.size()), Long.valueOf(j), Long.valueOf(this.inCounter.getAndSet(0L)), Long.valueOf(this.outCounter.getAndSet(0L))});
        }
    }

    public long getDispatchTimeout() {
        return this.dispatchTimeout;
    }

    public long getMaxPackCount() {
        return this.maxPackCount;
    }

    public long getMaxPackSize() {
        return this.maxPackSize;
    }

    public void setNeedOutputOvertimeData() {
        this.needOutputOvertimeData.getAndSet(true);
    }
}
