package org.apache.inlong.sort.standalone.dispatch;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.Context;
import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/dispatch/DispatchManager.class */
public class DispatchManager {
    public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class);
    public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
    public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
    public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
    public static final long DEFAULT_DISPATCH_TIMEOUT = 2000;
    public static final long DEFAULT_DISPATCH_MAX_PACKCOUNT = 256;
    public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
    private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
    private final long dispatchTimeout;
    private final long maxPackCount;
    private final long maxPackSize;
    private ConcurrentHashMap<String, DispatchProfile> profileCache = new ConcurrentHashMap<>();
    private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);

    public DispatchManager(Context context, LinkedBlockingQueue<DispatchProfile> linkedBlockingQueue) {
        this.dispatchQueue = linkedBlockingQueue;
        this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT, Long.valueOf(DEFAULT_DISPATCH_TIMEOUT)).longValue();
        this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT, 256L).longValue();
        this.maxPackSize = context.getLong(KEY_DISPATCH_MAX_PACKSIZE, Long.valueOf(DEFAULT_DISPATCH_MAX_PACKSIZE)).longValue();
    }

    public void addEvent(ProfileEvent profileEvent) {
        if (this.needOutputOvertimeData.get()) {
            outputOvertimeData();
            this.needOutputOvertimeData.set(false);
        }
        String uid = profileEvent.getUid();
        DispatchProfile dispatchProfile = this.profileCache.get(uid);
        if (dispatchProfile == null) {
            dispatchProfile = new DispatchProfile(uid, profileEvent.getInlongGroupId(), profileEvent.getInlongStreamId());
            this.profileCache.put(uid, dispatchProfile);
        }
        if (dispatchProfile.addEvent(profileEvent, this.maxPackCount, this.maxPackSize)) {
            return;
        }
        DispatchProfile dispatchProfile2 = new DispatchProfile(uid, profileEvent.getInlongGroupId(), profileEvent.getInlongStreamId());
        this.dispatchQueue.offer(this.profileCache.put(uid, dispatchProfile2));
        dispatchProfile2.addEvent(profileEvent, this.maxPackCount, this.maxPackSize);
    }

    public void outputOvertimeData() {
        LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", Integer.valueOf(this.profileCache.size()), Integer.valueOf(this.dispatchQueue.size()));
        long currentTimeMillis = System.currentTimeMillis() - this.dispatchTimeout;
        ArrayList arrayList = new ArrayList();
        long j = 0;
        for (Map.Entry<String, DispatchProfile> entry : this.profileCache.entrySet()) {
            DispatchProfile value = entry.getValue();
            j += value.getCount();
            if (value.isTimeout(currentTimeMillis)) {
                arrayList.add(entry.getKey());
            }
        }
        arrayList.forEach(str -> {
            this.dispatchQueue.offer(this.profileCache.remove(str));
        });
        LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}", new Object[]{Integer.valueOf(this.profileCache.size()), Integer.valueOf(this.dispatchQueue.size()), Long.valueOf(j)});
    }

    public long getDispatchTimeout() {
        return this.dispatchTimeout;
    }

    public long getMaxPackCount() {
        return this.maxPackCount;
    }

    public long getMaxPackSize() {
        return this.maxPackSize;
    }

    public void setNeedOutputOvertimeData() {
        this.needOutputOvertimeData.set(true);
    }
}
