package org.apache.rocketmq.streams.common.channel.sinkcache.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.channel.sinkcache.DataSourceAutoFlushTask;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.schedule.ScheduleManager;
import org.apache.rocketmq.streams.common.schedule.ScheduleTask;

/* loaded from: input_file:org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.class */
public class MessageCache<R> implements IMessageCache<R> {
    protected IMessageFlushCallBack<R> flushCallBack;
    protected transient DataSourceAutoFlushTask autoFlushTask;
    protected ExecutorService autoFlushExecutorService;
    protected volatile AtomicInteger messageCount = new AtomicInteger(0);
    protected int batchSize = 1000;
    protected volatile transient ConcurrentLinkedQueue<R> dataQueue = new ConcurrentLinkedQueue<>();
    protected AtomicBoolean openAutoFlushLock = new AtomicBoolean(false);
    protected volatile int autoFlushSize = 300;
    protected volatile int autoFlushTimeGap = 1000;

    public MessageCache(IMessageFlushCallBack<R> iMessageFlushCallBack) {
        this.flushCallBack = iMessageFlushCallBack;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public int addCache(R r) {
        offerQueue(r);
        int incrementAndGet = this.messageCount.incrementAndGet();
        if (this.batchSize > 0 && incrementAndGet >= this.batchSize) {
            flush();
        }
        return incrementAndGet;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public void openAutoFlush() {
        if (this.openAutoFlushLock.compareAndSet(false, true)) {
            this.autoFlushTask = new DataSourceAutoFlushTask(true, this);
            this.autoFlushTask.setAutoFlushSize(this.autoFlushSize);
            this.autoFlushTask.setAutoFlushTimeGap(this.autoFlushTimeGap);
            ScheduleTask scheduleTask = new ScheduleTask(this.autoFlushTask, this.autoFlushTask);
            scheduleTask.setExecutorService(this.autoFlushExecutorService);
            ScheduleManager.getInstance().regist(scheduleTask);
        }
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public void closeAutoFlush() {
        if (this.autoFlushTask != null) {
            this.autoFlushTask.setAutoFlush(false);
            this.openAutoFlushLock.set(false);
        }
    }

    protected void offerQueue(R r) {
        this.dataQueue.offer(r);
    }

    protected List<R> getMessagesFromQueue(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(this.dataQueue.poll());
        }
        return arrayList;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public Integer getMessageCount() {
        return Integer.valueOf(this.messageCount.get());
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public int flush() {
        if (getMessageCount().intValue() == 0) {
            return 0;
        }
        synchronized (this) {
            if (getMessageCount().intValue() == 0) {
                return 0;
            }
            int size = this.dataQueue.size();
            this.messageCount = new AtomicInteger(0);
            List<R> messagesFromQueue = getMessagesFromQueue(size);
            this.flushCallBack.flushMessage(messagesFromQueue);
            return messagesFromQueue.size();
        }
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public int flush(Set<String> set) {
        return flush();
    }

    public int getAutoFlushSize() {
        return this.autoFlushSize;
    }

    public void setAutoFlushSize(int i) {
        this.autoFlushSize = i;
    }

    public int getAutoFlushTimeGap() {
        return this.autoFlushTimeGap;
    }

    public void setAutoFlushTimeGap(int i) {
        this.autoFlushTimeGap = i;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public IMessageFlushCallBack<R> getFlushCallBack() {
        return this.flushCallBack;
    }

    public ExecutorService getAutoFlushExecutorService() {
        return this.autoFlushExecutorService;
    }

    public void setAutoFlushExecutorService(ExecutorService executorService) {
        this.autoFlushExecutorService = executorService;
    }
}
