package org.apache.rocketmq.streams.window.model;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.topology.model.IWindow;
import org.apache.rocketmq.streams.common.utils.CompressUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
import org.apache.rocketmq.streams.window.shuffle.ShuffleChannel;

/* loaded from: input_file:org/apache/rocketmq/streams/window/model/WindowCache.class */
public abstract class WindowCache extends AbstractSink implements IWindow.IWindowCheckpoint {
    private static final Log LOG = LogFactory.getLog(WindowCache.class);
    public static final String IS_COMPRESSION_MSG = "_is_compress_msg";
    public static final String COMPRESSION_MSG_DATA = "_compress_msg";
    public static final String ORIGIN_OFFSET = "origin_offset";
    public static final String ORIGIN_QUEUE_ID = "origin_queue_id";
    public static final String ORIGIN_QUEUE_IS_LONG = "origin_offset_is_LONG";
    public static final String ORIGIN_MESSAGE_HEADER = "origin_message_header";
    public static final String ORIGIN_SOURCE_NAME = "origin_offset_name";
    public static final String SHUFFLE_KEY = "SHUFFLE_KEY";
    public static final String ORIGIN_MESSAGE_TRACE_ID = "origin_request_id";
    protected transient ShuffleChannel shuffleChannel;
    protected transient boolean isWindowTest = false;
    protected transient AtomicLong COUNT = new AtomicLong(0);
    protected transient ShuffleMsgCache shuffleMsgCache = new ShuffleMsgCache();

    /* loaded from: input_file:org/apache/rocketmq/streams/window/model/WindowCache$ShuffleMsgCache.class */
    protected class ShuffleMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit, JSONObject>> {
        public ShuffleMsgCache() {
            super(new IMessageFlushCallBack<Pair<ISplit, JSONObject>>() { // from class: org.apache.rocketmq.streams.window.model.WindowCache.ShuffleMsgCache.1
                public boolean flushMessage(List<Pair<ISplit, JSONObject>> list) {
                    if (list == null || list.size() == 0) {
                        return true;
                    }
                    ISplit iSplit = (ISplit) list.get(0).getLeft();
                    JSONObject jSONObject = (JSONObject) list.get(0).getRight();
                    JSONArray msgs = WindowCache.this.shuffleChannel.getMsgs(jSONObject);
                    for (int i = 1; i < list.size(); i++) {
                        JSONArray msgs2 = WindowCache.this.shuffleChannel.getMsgs((JSONObject) list.get(i).getRight());
                        if (msgs2 != null) {
                            msgs.addAll(msgs2);
                        }
                    }
                    JSONObject jSONObject2 = new JSONObject();
                    jSONObject2.put(WindowCache.COMPRESSION_MSG_DATA, CompressUtil.gZip(jSONObject.toJSONString()));
                    jSONObject2.put(WindowCache.IS_COMPRESSION_MSG, true);
                    WindowCache.this.shuffleChannel.getProducer().batchAdd(new Message(jSONObject2), iSplit);
                    WindowCache.this.shuffleChannel.getProducer().flush(new String[]{iSplit.getQueueId()});
                    return true;
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String createSplitId(Pair<ISplit, JSONObject> pair) {
            return ((ISplit) pair.getLeft()).getQueueId();
        }
    }

    protected boolean initConfigurable() {
        this.shuffleMsgCache = new ShuffleMsgCache();
        this.shuffleMsgCache.setBatchSize(1000);
        this.shuffleMsgCache.setAutoFlushSize(100);
        this.shuffleMsgCache.setAutoFlushTimeGap(1000);
        this.shuffleMsgCache.openAutoFlush();
        this.isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
        return super.initConfigurable();
    }

    protected boolean batchInsert(List<IMessage> list) {
        Map<Integer, JSONArray> translateToShuffleMap = translateToShuffleMap(list);
        if (translateToShuffleMap != null && translateToShuffleMap.size() > 0) {
            HashSet hashSet = new HashSet();
            for (Map.Entry<Integer, JSONArray> entry : translateToShuffleMap.entrySet()) {
                ISplit split = this.shuffleChannel.getSplit(entry.getKey());
                this.shuffleMsgCache.addCache(new MutablePair(split, this.shuffleChannel.createMsg(entry.getValue(), split)));
                ArrayList arrayList = new ArrayList();
                hashSet.add(split.getQueueId());
                if (DebugWriter.getDebugWriter(this.shuffleChannel.getWindow().getConfigureName()).isOpenDebug()) {
                    JSONArray value = entry.getValue();
                    for (int i = 0; i < value.size(); i++) {
                        Message message = new Message(value.getJSONObject(i));
                        message.getHeader().setQueueId(value.getJSONObject(i).getString(ORIGIN_QUEUE_ID));
                        message.getHeader().setOffset(value.getJSONObject(i).getLong(ORIGIN_OFFSET));
                        arrayList.add(message);
                    }
                    DebugWriter.getDebugWriter(this.shuffleChannel.getWindow().getConfigureName()).writeWindowCache(this.shuffleChannel.getWindow(), arrayList, split.getQueueId());
                }
            }
        }
        if (!this.isWindowTest) {
            return true;
        }
        System.out.println(this.shuffleChannel.getWindow().getConfigureName() + " send shuffle msg count is " + this.COUNT.addAndGet(list.size()));
        this.shuffleMsgCache.flush();
        return true;
    }

    public void finishBatchMsg(BatchFinishMessage batchFinishMessage) {
        if (this.shuffleChannel == null || this.shuffleChannel.getProducer() == null) {
            return;
        }
        this.shuffleChannel.getProducer().flush();
        for (ISplit iSplit : this.shuffleChannel.getQueueList()) {
            IMessage deepCopy = batchFinishMessage.getMsg().deepCopy();
            deepCopy.getMessageBody().put(ORIGIN_QUEUE_ID, deepCopy.getHeader().getQueueId());
            this.shuffleChannel.getProducer().batchAdd(deepCopy, iSplit);
        }
        this.shuffleChannel.getProducer().flush();
    }

    protected Map<Integer, JSONArray> translateToShuffleMap(List<IMessage> list) {
        HashMap hashMap = new HashMap();
        for (IMessage iMessage : list) {
            if (!iMessage.getHeader().isSystemMessage()) {
                String generateShuffleKey = generateShuffleKey(iMessage);
                if (StringUtil.isEmpty(generateShuffleKey)) {
                    generateShuffleKey = "<null>";
                    LOG.debug("there is no group by value in message! " + iMessage.getMessageBody().toString());
                }
                Integer valueOf = Integer.valueOf(this.shuffleChannel.hash(generateShuffleKey));
                JSONObject messageBody = iMessage.getMessageBody();
                String offset = iMessage.getHeader().getOffset();
                String queueId = iMessage.getHeader().getQueueId();
                messageBody.put(ORIGIN_OFFSET, offset);
                messageBody.put(ORIGIN_QUEUE_ID, queueId);
                messageBody.put(ORIGIN_QUEUE_IS_LONG, Boolean.valueOf(iMessage.getHeader().getMessageOffset().isLongOfMainOffset()));
                messageBody.put(ORIGIN_MESSAGE_HEADER, JSONObject.toJSONString(iMessage.getHeader()));
                messageBody.put(ORIGIN_MESSAGE_TRACE_ID, iMessage.getHeader().getTraceId());
                messageBody.put(SHUFFLE_KEY, generateShuffleKey);
                addPropertyToMessage(iMessage, messageBody);
                JSONArray jSONArray = (JSONArray) hashMap.get(valueOf);
                if (jSONArray == null) {
                    jSONArray = new JSONArray();
                    hashMap.put(valueOf, jSONArray);
                }
                jSONArray.add(messageBody);
            }
        }
        return hashMap;
    }

    protected abstract String generateShuffleKey(IMessage iMessage);

    public boolean checkpoint(Set<String> set) {
        flush(set);
        this.shuffleMsgCache.flush(set);
        return true;
    }

    protected void addPropertyToMessage(IMessage iMessage, JSONObject jSONObject) {
    }

    public ShuffleChannel getShuffleChannel() {
        return this.shuffleChannel;
    }

    public ShuffleMsgCache getShuffleMsgCache() {
        return this.shuffleMsgCache;
    }

    public void setShuffleChannel(ShuffleChannel shuffleChannel) {
        this.shuffleChannel = shuffleChannel;
    }
}
