package org.apache.rocketmq.streams.common.channel.sink;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MultiSplitMessageCache;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.checkpoint.SourceState;
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentification;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.interfaces.ILifeCycle;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.common.utils.StringUtil;

/* loaded from: input_file:org/apache/rocketmq/streams/common/channel/sink/AbstractSink.class */
public abstract class AbstractSink extends BasedConfigurable implements ISink<AbstractSink>, ILifeCycle {
    private static final Log logger = LogFactory.getLog(AbstractSink.class);
    public static String TARGET_QUEUE = "target_queue";
    public static final int DEFAULT_BATCH_SIZE = 3000;
    protected transient IMessageCache<IMessage> messageCache;
    protected volatile int batchSize = DEFAULT_BATCH_SIZE;
    protected volatile transient Map<String, SourceState> sourceName2State = new HashMap();
    protected volatile int autoFlushSize = 300;
    protected volatile int autoFlushTimeGap = 1000;

    public AbstractSink() {
        setType(ISink.TYPE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.streams.common.configurable.AbstractConfigurable
    public boolean initConfigurable() {
        this.messageCache = new MultiSplitMessageCache(this);
        ((MessageCache) this.messageCache).setAutoFlushTimeGap(this.autoFlushTimeGap);
        ((MessageCache) this.messageCache).setAutoFlushSize(this.autoFlushSize);
        this.messageCache.openAutoFlush();
        this.sourceName2State = new HashMap();
        return super.initConfigurable();
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public boolean batchAdd(IMessage iMessage, ISplit iSplit) {
        iMessage.getMessageBody().put(TARGET_QUEUE, iSplit);
        return batchAdd(iMessage);
    }

    public ISplit getSplit(IMessage iMessage) {
        return (ISplit) iMessage.getMessageBody().get(TARGET_QUEUE);
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public boolean batchAdd(IMessage iMessage) {
        this.messageCache.addCache(iMessage);
        return true;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public void openAutoFlush() {
        this.messageCache.openAutoFlush();
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public boolean batchSave(List<IMessage> list) {
        if (list == null || list.size() == 0) {
            return true;
        }
        int i = this.batchSize;
        if (i == -1) {
            i = 3000;
        }
        int size = list.size();
        if (size <= i) {
            batchInsert(list);
            return true;
        }
        int i2 = size / i;
        if (size % i > 0) {
            i2++;
        }
        int i3 = 0;
        int i4 = i;
        if (i4 > size) {
            i4 = size;
        }
        for (int i5 = 0; i5 < i2; i5++) {
            batchInsert(list.subList(i3, i4));
            i3 = i4;
            i4 += i;
            if (i4 > size) {
                i4 = size;
            }
        }
        return true;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public boolean flush(Set<String> set) {
        return this.messageCache.flush(set) > 0;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public boolean flush(String... strArr) {
        if (strArr == null) {
            return true;
        }
        HashSet hashSet = new HashSet();
        for (String str : strArr) {
            hashSet.add(str);
        }
        return flush(hashSet);
    }

    protected abstract boolean batchInsert(List<IMessage> list);

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public void closeAutoFlush() {
        this.messageCache.closeAutoFlush();
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack
    public boolean flushMessage(List<IMessage> list) {
        boolean batchSave = batchSave(list);
        for (IMessage iMessage : list) {
            String queueId = iMessage.getHeader().getQueueId();
            MessageOffset messageOffset = iMessage.getHeader().getMessageOffset();
            ISource source = iMessage.getHeader().getSource();
            if (source != null) {
                String createSourceName = CheckPointManager.createSourceName(source, iMessage.getHeader().getPiplineName());
                SourceState sourceState = this.sourceName2State.get(createSourceName);
                if (sourceState == null) {
                    sourceState = new SourceState();
                    sourceState.setSourceName(createSourceName);
                }
                sourceState.getQueueId2Offsets().put(queueId, messageOffset);
            }
        }
        return batchSave;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public boolean checkpoint(Set<String> set) {
        return flush(set);
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public boolean checkpoint(String... strArr) {
        if (strArr == null) {
            return false;
        }
        HashSet hashSet = new HashSet();
        for (String str : strArr) {
            hashSet.add(str);
        }
        return checkpoint(hashSet);
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public boolean flush() {
        String configureName = getConfigureName();
        if (StringUtil.isEmpty(configureName)) {
            configureName = getClass().getName();
        }
        int flush = this.messageCache.flush();
        if (flush <= 0) {
            return true;
        }
        logger.debug(String.format("%s finished flush data %d", configureName, Integer.valueOf(flush)));
        return true;
    }

    protected List<JSONObject> convertJsonObjectFromMessage(List<IMessage> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<IMessage> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getMessageBody());
        }
        return arrayList;
    }

    @Override // org.apache.rocketmq.streams.common.topology.builder.IStageBuilder
    public AbstractSink createStageChain(PipelineBuilder pipelineBuilder) {
        return this;
    }

    @Override // org.apache.rocketmq.streams.common.topology.builder.IStageBuilder
    public void addConfigurables(PipelineBuilder pipelineBuilder) {
        pipelineBuilder.addConfigurables(this);
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public IMessageCache<IMessage> getMessageCache() {
        return this.messageCache;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public Map<String, MessageOffset> getFinishedQueueIdAndOffsets(CheckPointMessage checkPointMessage) {
        String str = null;
        if (checkPointMessage.getStreamOperator() instanceof IConfigurableIdentification) {
            str = ((IConfigurableIdentification) checkPointMessage.getStreamOperator()).getConfigureName();
        }
        SourceState sourceState = this.sourceName2State.get(CheckPointManager.createSourceName(checkPointMessage.getSource(), str));
        return sourceState != null ? sourceState.getQueueId2Offsets() : new HashMap();
    }

    public void setMessageCache(IMessageCache<IMessage> iMessageCache) {
        this.messageCache = iMessageCache;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sink.ISink
    public void atomicSink(ISystemMessage iSystemMessage) {
    }

    @Override // org.apache.rocketmq.streams.common.interfaces.ILifeCycle
    public void finish() throws Exception {
        closeAutoFlush();
    }

    @Override // org.apache.rocketmq.streams.common.interfaces.ILifeCycle
    public boolean isFinished() throws Exception {
        return false;
    }

    public int getAutoFlushSize() {
        return this.autoFlushSize;
    }

    public void setAutoFlushSize(int i) {
        this.autoFlushSize = i;
    }

    public int getAutoFlushTimeGap() {
        return this.autoFlushTimeGap;
    }

    public void setAutoFlushTimeGap(int i) {
        this.autoFlushTimeGap = i;
    }
}
