package org.apache.inlong.dataproxy.sink.mq;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Context;
import org.apache.flume.event.SimpleEvent;
import org.apache.inlong.dataproxy.consts.HttpAttrConst;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/BatchPackManager.class */
public class BatchPackManager {
    private static final Logger logger = LoggerFactory.getLogger(BatchPackManager.class);
    public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
    public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
    public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
    public static final long DEFAULT_DISPATCH_TIMEOUT = 2000;
    public static final long DEFAULT_DISPATCH_MAX_PACKCOUNT = 256;
    public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
    public static final long MINUTE_MS = 60000;
    private final long dispatchTimeout;
    private final long maxPackCount;
    private final long maxPackSize;
    private final MessageQueueZoneSink mqZoneSink;
    private final ConcurrentHashMap<String, PackProfile> profileCache = new ConcurrentHashMap<>();
    private final AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
    private final AtomicLong inCounter = new AtomicLong(0);
    private final AtomicLong outCounter = new AtomicLong(0);

    public BatchPackManager(MessageQueueZoneSink messageQueueZoneSink, Context context) {
        this.mqZoneSink = messageQueueZoneSink;
        this.dispatchTimeout = context.getLong("dispatchTimeout", 2000L).longValue();
        this.maxPackCount = context.getLong("dispatchMaxPackCount", 256L).longValue();
        this.maxPackSize = context.getLong("dispatchMaxPackSize", 327680L).longValue();
    }

    public void addEvent(ProxyEvent proxyEvent) {
        String uid = proxyEvent.getUid();
        long msgTime = proxyEvent.getMsgTime() - (proxyEvent.getMsgTime() % 60000);
        String str = uid + "." + msgTime;
        PackProfile packProfile = this.profileCache.get(str);
        if (packProfile == null) {
            packProfile = new BatchPackProfile(uid, proxyEvent.getInlongGroupId(), proxyEvent.getInlongStreamId(), msgTime);
            this.profileCache.put(str, packProfile);
        }
        if (!packProfile.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize)) {
            BatchPackProfile batchPackProfile = new BatchPackProfile(uid, proxyEvent.getInlongGroupId(), proxyEvent.getInlongStreamId(), msgTime);
            PackProfile put = this.profileCache.put(str, batchPackProfile);
            if (put != null) {
                this.mqZoneSink.acquireAndOfferDispatchedRecord(put);
            }
            this.outCounter.addAndGet(packProfile.getCount());
            batchPackProfile.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize);
        }
        this.inCounter.incrementAndGet();
    }

    public void addPackEvent(ProxyPackEvent proxyPackEvent) {
        String uid = proxyPackEvent.getUid();
        long msgTime = proxyPackEvent.getMsgTime() - (proxyPackEvent.getMsgTime() % 60000);
        BatchPackProfile batchPackProfile = new BatchPackProfile(uid, proxyPackEvent.getInlongGroupId(), proxyPackEvent.getInlongStreamId(), msgTime);
        BatchPackProfileCallback batchPackProfileCallback = new BatchPackProfileCallback(proxyPackEvent.getEvents().size(), proxyPackEvent.getCallback());
        batchPackProfile.setCallback(batchPackProfileCallback);
        for (ProxyEvent proxyEvent : proxyPackEvent.getEvents()) {
            if (!batchPackProfile.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize)) {
                this.outCounter.addAndGet(batchPackProfile.getCount());
                this.mqZoneSink.acquireAndOfferDispatchedRecord(batchPackProfile);
                batchPackProfile = new BatchPackProfile(uid, proxyEvent.getInlongGroupId(), proxyEvent.getInlongStreamId(), msgTime);
                batchPackProfile.setCallback(batchPackProfileCallback);
                batchPackProfile.addEvent(proxyEvent, this.maxPackCount, this.maxPackSize);
            }
            this.inCounter.incrementAndGet();
        }
        if (batchPackProfile.getEvents().size() > 0) {
            this.outCounter.addAndGet(batchPackProfile.getCount());
            this.mqZoneSink.acquireAndOfferDispatchedRecord(batchPackProfile);
        }
    }

    public void addSimpleEvent(SimpleEvent simpleEvent) {
        Map headers = simpleEvent.getHeaders();
        String str = (String) headers.get(HttpAttrConst.KEY_GROUP_ID);
        String str2 = (String) headers.get(HttpAttrConst.KEY_STREAM_ID);
        String generateUid = InlongId.generateUid(str, str2);
        long j = NumberUtils.toLong((String) headers.get(HttpAttrConst.KEY_DATA_TIME), System.currentTimeMillis());
        SimplePackProfile simplePackProfile = new SimplePackProfile(generateUid, str, str2, j - (j % 60000));
        simplePackProfile.addEvent(simpleEvent, this.maxPackCount, this.maxPackSize);
        this.mqZoneSink.acquireAndOfferDispatchedRecord(simplePackProfile);
        this.outCounter.addAndGet(simplePackProfile.getCount());
        this.inCounter.incrementAndGet();
    }

    public void outputOvertimeData() {
        if (this.needOutputOvertimeData.getAndSet(false)) {
            int size = this.profileCache.size();
            int dispatchQueueSize = this.mqZoneSink.getDispatchQueueSize();
            long currentTimeMillis = System.currentTimeMillis() - this.dispatchTimeout;
            ArrayList arrayList = new ArrayList();
            long j = 0;
            for (Map.Entry<String, PackProfile> entry : this.profileCache.entrySet()) {
                PackProfile value = entry.getValue();
                j += value.getCount();
                if (value.isTimeout(currentTimeMillis)) {
                    arrayList.add(entry.getKey());
                }
            }
            arrayList.forEach(str -> {
                PackProfile remove = this.profileCache.remove(str);
                if (remove != null) {
                    this.mqZoneSink.acquireAndOfferDispatchedRecord(remove);
                    this.outCounter.addAndGet(remove.getCount());
                }
            });
            long andSet = this.inCounter.getAndSet(0L);
            long andSet2 = this.outCounter.getAndSet(0L);
            if (arrayList.isEmpty()) {
                return;
            }
            logger.info("{} output overtime data, profileCacheSize: before={}, after={}, dispatchQueueSize: before={}, after={}, eventCount: {}, inCounter: {}, outCounter: {}", new Object[]{this.mqZoneSink.getName(), Integer.valueOf(size), Integer.valueOf(this.profileCache.size()), Integer.valueOf(dispatchQueueSize), Integer.valueOf(this.mqZoneSink.getDispatchQueueSize()), Long.valueOf(j), Long.valueOf(andSet), Long.valueOf(andSet2)});
        }
    }

    public long getDispatchTimeout() {
        return this.dispatchTimeout;
    }

    public long getMaxPackCount() {
        return this.maxPackCount;
    }

    public long getMaxPackSize() {
        return this.maxPackSize;
    }

    public void setNeedOutputOvertimeData() {
        this.needOutputOvertimeData.getAndSet(true);
    }
}
