package org.apache.inlong.agent.plugin.sinks.filecollect;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.core.task.file.MemoryManager;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.filecollect.OffsetAckInfo;
import org.apache.inlong.agent.message.filecollect.ProxyMessage;
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.task.filecollect.LogFileCollectTask;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.class */
public class ProxySink extends AbstractSink {
    private MessageFilter messageFilter;
    private SenderManager senderManager;
    private byte[] fieldSplitter;
    private OffsetManager offsetManager;
    private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class);
    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("proxy-sink"));
    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
    private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
    public final int SAVE_OFFSET_INTERVAL_MS = LogFileCollectTask.CORE_THREAD_SLEEP_TIME;
    private volatile boolean shutdown = false;
    private volatile boolean running = false;
    private volatile boolean inited = false;
    private long lastPrintTime = 0;
    private List<OffsetAckInfo> ackInfoList = new ArrayList();
    private final ReentrantReadWriteLock packageAckInfoLock = new ReentrantReadWriteLock(true);
    private volatile boolean offsetRunning = false;

    public void write(Message message) {
        boolean z = false;
        while (!this.shutdown && !z) {
            z = putInCache(message);
            if (!z) {
                AgentUtils.silenceSleepInMs(10L);
            }
        }
    }

    private boolean putInCache(Message message) {
        if (message == null) {
            return true;
        }
        try {
            extractStreamFromMessage(message, this.fieldSplitter);
            if (message instanceof EndMessage) {
                this.sinkMetric.sinkFailCount.incrementAndGet();
                return true;
            }
            ProxyMessage proxyMessage = new ProxyMessage(message);
            if (!MemoryManager.getInstance().tryAcquire("agent.global.writer.permit", message.getBody().length)) {
                MemoryManager.getInstance().printDetail("agent.global.writer.permit", "proxy sink");
                return false;
            }
            this.cache.generateExtraMap(proxyMessage.getDataKey());
            boolean add = this.cache.add(proxyMessage);
            if (add) {
                addAckInfo(proxyMessage.getAckInfo());
            } else {
                MemoryManager.getInstance().release("agent.global.writer.permit", message.getBody().length);
                this.sinkMetric.sinkFailCount.incrementAndGet();
            }
            return add;
        } catch (Exception e) {
            LOGGER.error("write message to Proxy sink error", e);
            return false;
        } catch (Throwable th) {
            ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
            return false;
        }
    }

    private void extractStreamFromMessage(Message message, byte[] bArr) {
        if (this.messageFilter != null) {
            message.getHeader().put("inlongStreamId", this.messageFilter.filterStreamId(message, bArr));
        }
    }

    private Runnable coreThread() {
        return () -> {
            AgentThreadFactory.nameThread("flushCache-" + this.profile.getTaskId() + "-" + this.profile.getInstanceId());
            LOGGER.info("start flush cache {}:{}", this.inlongGroupId, this.sourceName);
            this.running = true;
            while (!this.shutdown) {
                sendMessageFromCache();
                AgentUtils.silenceSleepInMs(this.batchFlushInterval);
            }
            LOGGER.info("stop flush cache {}:{}", this.inlongGroupId, this.sourceName);
            this.running = false;
        };
    }

    public void sendMessageFromCache() {
        for (Map.Entry entry : this.cache.getMessageQueueMap().entrySet()) {
            SenderMessage fetchSenderMessage = this.cache.fetchSenderMessage((String) entry.getKey(), (LinkedBlockingQueue) entry.getValue());
            if (fetchSenderMessage != null) {
                this.senderManager.sendBatch(fetchSenderMessage);
                if (AgentUtils.getCurrentTime() - this.lastPrintTime > TimeUnit.SECONDS.toMillis(1L)) {
                    this.lastPrintTime = AgentUtils.getCurrentTime();
                    LOGGER.info("send groupId {}, streamId {}, message size {}, taskId {}, instanceId {} sendTime is {}", new Object[]{this.inlongGroupId, this.inlongStreamId, Integer.valueOf(fetchSenderMessage.getDataList().size()), this.profile.getTaskId(), this.profile.getInstanceId(), Long.valueOf(fetchSenderMessage.getDataTime())});
                }
            }
        }
    }

    @Override // org.apache.inlong.agent.plugin.sinks.filecollect.AbstractSink
    public void init(InstanceProfile instanceProfile) {
        super.init(instanceProfile);
        this.fieldSplitter = instanceProfile.get("proxy.field.splitter", "|").getBytes(StandardCharsets.UTF_8);
        this.sourceName = instanceProfile.getInstanceId();
        this.offsetManager = OffsetManager.getInstance();
        this.senderManager = new SenderManager(instanceProfile, this.inlongGroupId, this.sourceName);
        try {
            this.senderManager.Start();
            EXECUTOR_SERVICE.execute(coreThread());
            EXECUTOR_SERVICE.execute(flushOffset());
            this.inited = true;
        } catch (Throwable th) {
            this.shutdown = true;
            LOGGER.error("error while init sender for group id {}", this.inlongGroupId);
            ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
            throw new IllegalStateException(th);
        }
    }

    public void destroy() {
        LOGGER.info("destroy sink {}", this.sourceName);
        if (!this.inited) {
            return;
        }
        this.shutdown = true;
        while (true) {
            if (!this.running && !this.offsetRunning) {
                this.senderManager.Stop();
                clearOffset();
                LOGGER.info("destroy sink {} end", this.sourceName);
                return;
            }
            AgentUtils.silenceSleepInMs(10L);
        }
    }

    public boolean sinkFinish() {
        boolean z = false;
        this.packageAckInfoLock.writeLock().lock();
        if (this.ackInfoList.isEmpty()) {
            z = true;
        }
        this.packageAckInfoLock.writeLock().unlock();
        return z;
    }

    private void addAckInfo(OffsetAckInfo offsetAckInfo) {
        this.packageAckInfoLock.writeLock().lock();
        this.ackInfoList.add(offsetAckInfo);
        this.packageAckInfoLock.writeLock().unlock();
    }

    private Runnable flushOffset() {
        return () -> {
            AgentThreadFactory.nameThread("flushOffset-" + this.profile.getTaskId() + "-" + this.profile.getInstanceId());
            LOGGER.info("start flush offset {}:{}", this.inlongGroupId, this.sourceName);
            this.offsetRunning = true;
            while (!this.shutdown) {
                doFlushOffset();
                AgentUtils.silenceSleepInMs(1000L);
            }
            LOGGER.info("stop flush offset {}:{}", this.inlongGroupId, this.sourceName);
            this.offsetRunning = false;
        };
    }

    private void doFlushOffset() {
        this.packageAckInfoLock.writeLock().lock();
        OffsetAckInfo offsetAckInfo = null;
        while (0 < this.ackInfoList.size() && this.ackInfoList.get(0).getHasAck().booleanValue()) {
            offsetAckInfo = this.ackInfoList.remove(0);
            MemoryManager.getInstance().release("agent.global.writer.permit", offsetAckInfo.getLen());
        }
        if (offsetAckInfo != null) {
            LOGGER.info("save offset {} taskId {} instanceId {}", new Object[]{offsetAckInfo.getOffset(), this.profile.getTaskId(), this.profile.getInstanceId()});
            this.offsetManager.setOffset(new OffsetProfile(this.profile.getTaskId(), this.profile.getInstanceId(), offsetAckInfo.getOffset().longValue(), this.profile.get("inodeInfo")));
        }
        this.packageAckInfoLock.writeLock().unlock();
    }

    private void clearOffset() {
        this.packageAckInfoLock.writeLock().lock();
        while (0 < this.ackInfoList.size()) {
            MemoryManager.getInstance().release("agent.global.writer.permit", this.ackInfoList.remove(0).getLen());
        }
        this.packageAckInfoLock.writeLock().unlock();
    }
}
