package org.apache.inlong.dataproxy.sink.mq;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Context;
import org.apache.flume.event.SimpleEvent;
import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/BatchPackManager.class */
public class BatchPackManager {
    public static final Logger LOG = LoggerFactory.getLogger(BatchPackManager.class);
    public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
    public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
    public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
    public static final long DEFAULT_DISPATCH_TIMEOUT = 2000;
    public static final long DEFAULT_DISPATCH_MAX_PACKCOUNT = 256;
    public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
    public static final long MINUTE_MS = 60000;
    private final long dispatchTimeout;
    private final long maxPackCount;
    private final long maxPackSize;
    private BufferQueue<BatchPackProfile> dispatchQueue;
    private ConcurrentHashMap<String, BatchPackProfile> profileCache = new ConcurrentHashMap<>();
    private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
    private AtomicLong inCounter = new AtomicLong(0);
    private AtomicLong outCounter = new AtomicLong(0);

    public BatchPackManager(Context context, BufferQueue<BatchPackProfile> bufferQueue) {
        this.dispatchQueue = bufferQueue;
        this.dispatchTimeout = context.getLong("dispatchTimeout", 2000L).longValue();
        this.maxPackCount = context.getLong("dispatchMaxPackCount", 256L).longValue();
        this.maxPackSize = context.getLong("dispatchMaxPackSize", 327680L).longValue();
    }

    public void addEvent(ProxyEvent proxyEvent) {
        String uid = proxyEvent.getUid();
        long msgTime = proxyEvent.getMsgTime() - (proxyEvent.getMsgTime() % 60000);
        String str = uid + "." + msgTime;
        BatchPackProfile batchPackProfile = this.profileCache.get(str);
        if (batchPackProfile == null) {
            batchPackProfile = new BatchPackProfile(uid, proxyEvent.getInlongGroupId(), proxyEvent.getInlongStreamId(), msgTime);
            this.profileCache.put(str, batchPackProfile);
        }
        if (!batchPackProfile.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize)) {
            BatchPackProfile batchPackProfile2 = new BatchPackProfile(uid, proxyEvent.getInlongGroupId(), proxyEvent.getInlongStreamId(), msgTime);
            BatchPackProfile put = this.profileCache.put(str, batchPackProfile2);
            this.dispatchQueue.acquire(put.getSize());
            this.dispatchQueue.offer(put);
            this.outCounter.addAndGet(batchPackProfile.getCount());
            batchPackProfile2.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize);
        }
        this.inCounter.incrementAndGet();
    }

    public void addPackEvent(ProxyPackEvent proxyPackEvent) {
        String uid = proxyPackEvent.getUid();
        long msgTime = proxyPackEvent.getMsgTime() - (proxyPackEvent.getMsgTime() % 60000);
        BatchPackProfile batchPackProfile = new BatchPackProfile(uid, proxyPackEvent.getInlongGroupId(), proxyPackEvent.getInlongStreamId(), msgTime);
        BatchPackProfileCallback batchPackProfileCallback = new BatchPackProfileCallback(proxyPackEvent.getEvents().size(), proxyPackEvent.getCallback());
        batchPackProfile.setCallback(batchPackProfileCallback);
        for (ProxyEvent proxyEvent : proxyPackEvent.getEvents()) {
            this.inCounter.incrementAndGet();
            if (!batchPackProfile.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize)) {
                this.outCounter.addAndGet(batchPackProfile.getCount());
                this.dispatchQueue.acquire(batchPackProfile.getSize());
                this.dispatchQueue.offer(batchPackProfile);
                batchPackProfile = new BatchPackProfile(uid, proxyEvent.getInlongGroupId(), proxyEvent.getInlongStreamId(), msgTime);
                batchPackProfile.setCallback(batchPackProfileCallback);
                batchPackProfile.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize);
            }
        }
        if (batchPackProfile.getEvents().size() > 0) {
            this.outCounter.addAndGet(batchPackProfile.getCount());
            this.dispatchQueue.acquire(batchPackProfile.getSize());
            this.dispatchQueue.offer(batchPackProfile);
        }
    }

    public void addSimpleEvent(SimpleEvent simpleEvent) {
        SimpleBatchPackProfileV0 create = SimpleBatchPackProfileV0.create(simpleEvent);
        this.dispatchQueue.acquire(create.getSize());
        this.dispatchQueue.offer(create);
        this.outCounter.addAndGet(create.getCount());
        this.inCounter.incrementAndGet();
    }

    public void outputOvertimeData() {
        if (this.needOutputOvertimeData.getAndSet(false)) {
            LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", Integer.valueOf(this.profileCache.size()), Integer.valueOf(this.dispatchQueue.size()));
            long currentTimeMillis = System.currentTimeMillis() - this.dispatchTimeout;
            ArrayList arrayList = new ArrayList();
            long j = 0;
            for (Map.Entry<String, BatchPackProfile> entry : this.profileCache.entrySet()) {
                BatchPackProfile value = entry.getValue();
                j += value.getCount();
                if (value.isTimeout(currentTimeMillis)) {
                    arrayList.add(entry.getKey());
                }
            }
            arrayList.forEach(str -> {
                BatchPackProfile remove = this.profileCache.remove(str);
                if (remove != null) {
                    this.dispatchQueue.acquire(remove.getSize());
                    this.dispatchQueue.offer(remove);
                    this.outCounter.addAndGet(remove.getCount());
                }
            });
            LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{},inCounter:{},outCounter:{}", new Object[]{Integer.valueOf(this.profileCache.size()), Integer.valueOf(this.dispatchQueue.size()), Long.valueOf(j), Long.valueOf(this.inCounter.getAndSet(0L)), Long.valueOf(this.outCounter.getAndSet(0L))});
        }
    }

    public long getDispatchTimeout() {
        return this.dispatchTimeout;
    }

    public long getMaxPackCount() {
        return this.maxPackCount;
    }

    public long getMaxPackSize() {
        return this.maxPackSize;
    }

    public void setNeedOutputOvertimeData() {
        this.needOutputOvertimeData.getAndSet(true);
    }
}
