package org.apache.inlong.agent.message.filecollect;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.DateTransUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/message/filecollect/ProxyMessageCache.class */
public class ProxyMessageCache {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProxyMessageCache.class);
    private final String taskId;
    private final String instanceId;
    private final int maxPackSize;
    private final int maxQueueNumber;
    private final String groupId;
    private final int cacheTimeout;
    private long dataTime;
    private boolean isRealTime;
    private final AtomicLong cacheSize = new AtomicLong(0);
    private long lastPrintTime = 0;
    private Map<String, String> extraMap = new HashMap();
    private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> messageQueueMap = new ConcurrentHashMap<>();

    public ProxyMessageCache(InstanceProfile instanceProfile, String str, String str2) {
        this.isRealTime = false;
        this.taskId = instanceProfile.getTaskId();
        this.instanceId = instanceProfile.getInstanceId();
        this.groupId = str;
        this.maxPackSize = instanceProfile.getInt(CommonConstants.PROXY_PACKAGE_MAX_SIZE, CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE);
        this.maxQueueNumber = instanceProfile.getInt(CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER, 10000);
        this.cacheTimeout = instanceProfile.getInt(CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS, CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
        try {
            String str3 = instanceProfile.get(TaskConstants.TASK_CYCLE_UNIT);
            if (str3.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
                this.isRealTime = true;
                str3 = "h";
            }
            this.dataTime = DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(), str3);
        } catch (ParseException e) {
            LOGGER.info("trans dataTime error", e);
        }
        this.extraMap.put("syncSend", "false");
        this.extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
    }

    public void generateExtraMap(String str) {
        this.extraMap.put("partitionKey", str);
    }

    private boolean queueIsFull(LinkedBlockingQueue<ProxyMessage> linkedBlockingQueue) {
        return linkedBlockingQueue.size() >= this.maxQueueNumber - 1;
    }

    public boolean add(ProxyMessage proxyMessage) {
        LinkedBlockingQueue<ProxyMessage> makeSureQueueExist = makeSureQueueExist(proxyMessage.getInlongStreamId());
        try {
            if (queueIsFull(makeSureQueueExist)) {
                printQueueFull();
                return false;
            }
            makeSureQueueExist.put(proxyMessage);
            this.cacheSize.addAndGet(proxyMessage.getBody().length);
            return true;
        } catch (Exception e) {
            LOGGER.error("exception caught", e);
            return false;
        }
    }

    private void printQueueFull() {
        if (AgentUtils.getCurrentTime() - this.lastPrintTime > TimeUnit.SECONDS.toMillis(1L)) {
            this.lastPrintTime = AgentUtils.getCurrentTime();
            LOGGER.warn("message queue is greater than {}, stop adding message, maybe proxy get stuck", Integer.valueOf(this.maxQueueNumber));
        }
    }

    public ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> getMessageQueueMap() {
        return this.messageQueueMap;
    }

    private LinkedBlockingQueue<ProxyMessage> makeSureQueueExist(String str) {
        LinkedBlockingQueue<ProxyMessage> linkedBlockingQueue = this.messageQueueMap.get(str);
        if (linkedBlockingQueue == null) {
            linkedBlockingQueue = new LinkedBlockingQueue<>();
            this.messageQueueMap.put(str, linkedBlockingQueue);
        }
        return linkedBlockingQueue;
    }

    public SenderMessage fetchSenderMessage(String str, LinkedBlockingQueue<ProxyMessage> linkedBlockingQueue) {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        while (true) {
            if (linkedBlockingQueue.isEmpty()) {
                break;
            }
            ProxyMessage peek = linkedBlockingQueue.peek();
            int length = peek.getBody().length;
            if (i + length > this.maxPackSize) {
                break;
            }
            ProxyMessage remove = linkedBlockingQueue.remove();
            int length2 = remove.getBody().length;
            if (length > this.maxPackSize) {
                LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", Integer.valueOf(peek.getBody().length), Integer.valueOf(this.maxPackSize));
                this.cacheSize.addAndGet(-length2);
                linkedBlockingQueue.remove();
                break;
            }
            i += length2;
            this.cacheSize.addAndGet(-length2);
            arrayList.add(remove.getBody());
            arrayList2.add(remove.getAckInfo());
        }
        long currentTime = this.isRealTime ? AgentUtils.getCurrentTime() : this.dataTime;
        if (arrayList.isEmpty()) {
            return null;
        }
        return new SenderMessage(this.taskId, this.instanceId, this.groupId, str, arrayList, currentTime, this.extraMap, arrayList2);
    }

    public Map<String, String> getExtraMap() {
        return this.extraMap;
    }

    public long getCacheSize() {
        return this.cacheSize.get();
    }
}
