package org.apache.inlong.audit.sink;

import com.github.benmanes.caffeine.cache.Cache;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.audit.channel.DataQueue;
import org.apache.inlong.audit.config.ConfigConstants;
import org.apache.inlong.audit.config.Configuration;
import org.apache.inlong.audit.entities.StatData;
import org.apache.inlong.audit.utils.CacheUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/sink/CacheSink.class */
public class CacheSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(CacheSink.class);
    private final DataQueue dataQueue;
    private final Cache<String, StatData> cache;
    private final ScheduledExecutorService sinkTimer = Executors.newSingleThreadScheduledExecutor();
    private final int pullTimeOut = Configuration.getInstance().get(ConfigConstants.KEY_QUEUE_PULL_TIMEOUT, 1000);

    public CacheSink(DataQueue dataQueue, Cache<String, StatData> cache) {
        this.dataQueue = dataQueue;
        this.cache = cache;
    }

    public void start() {
        this.sinkTimer.scheduleWithFixedDelay(this::process, 0L, Configuration.getInstance().get(ConfigConstants.KEY_SOURCE_DB_SINK_INTERVAL, 100), TimeUnit.MILLISECONDS);
    }

    private void process() {
        try {
            StatData pull = this.dataQueue.pull(this.pullTimeOut, TimeUnit.MILLISECONDS);
            while (pull != null) {
                this.cache.put(CacheUtils.buildCacheKey(pull.getLogTs(), pull.getInlongGroupId(), pull.getInlongStreamId(), pull.getAuditId(), pull.getAuditTag()), pull);
                pull = this.dataQueue.pull(this.pullTimeOut, TimeUnit.MILLISECONDS);
            }
        } catch (Exception e) {
            LOGGER.error("Process exception! ", e);
        }
    }

    public void destroy() {
        this.sinkTimer.shutdown();
    }
}
