package org.apache.rocketmq.streams.window.shuffle;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage;
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
import org.apache.rocketmq.streams.common.checkpoint.CheckPointState;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessage;
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.CompressUtil;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.rocketmq.streams.window.operator.impl.WindowOperator;
import org.apache.rocketmq.streams.window.sqlcache.ISQLElement;
import org.apache.rocketmq.streams.window.sqlcache.impl.SQLElement;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;

/* loaded from: input_file:org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.class */
public class ShuffleChannel extends AbstractSystemChannel {
    protected static final Log LOG = LogFactory.getLog(ShuffleChannel.class);
    protected static final String SHUFFLE_QUEUE_ID = "SHUFFLE_QUEUE_ID";
    public static final String SHUFFLE_OFFSET = "SHUFFLE_OFFSET";
    protected static final String SHUFFLE_MESSAGES = "SHUFFLE_MESSAGES";
    private static final String SHUFFLE_TRACE_ID = "SHUFFLE_TRACE_ID";
    protected ShuffleCache shuffleCache;
    protected List<ISplit> queueList;
    protected AbstractShuffleWindow window;
    private Set<String> currentQueueIds;
    protected String MSG_OWNER = "MSG_OWNER";
    protected Map<String, ISplit> queueMap = new ConcurrentHashMap();
    protected transient boolean isWindowTest = false;
    protected transient Map<String, String> split2MaxOffsets = new HashMap();
    protected transient AtomicBoolean hasStart = new AtomicBoolean(false);
    protected transient AtomicLong COUNT = new AtomicLong(0);

    public ShuffleChannel(AbstractShuffleWindow abstractShuffleWindow) {
        this.window = abstractShuffleWindow;
        this.channelConfig = new HashMap();
        this.channelConfig.put("CHANNEL_PROPERTY_KEY_PREFIX", "window.shuffle.channel.");
        this.channelConfig.put("CHANNEL_TYPE", "window.shuffle.channel.type");
        this.shuffleCache = new ShuffleCache(abstractShuffleWindow);
        this.shuffleCache.init();
        this.shuffleCache.openAutoFlush();
    }

    @Override // org.apache.rocketmq.streams.window.shuffle.AbstractSystemChannel
    public void startChannel() {
        if (this.hasStart.compareAndSet(false, true)) {
            super.startChannel();
        }
    }

    public void init() {
        this.consumer = createSource(this.window.getNameSpace(), this.window.getConfigureName());
        this.producer = createSink(this.window.getNameSpace(), this.window.getConfigureName());
        if (this.consumer == null || this.producer == null) {
            autoCreateShuffleChannel(this.window.getFireReceiver().getPipeline());
        }
        if (this.consumer == null) {
            return;
        }
        if (this.consumer instanceof AbstractSource) {
            this.consumer.setJsonData(true);
        }
        if (this.producer != null && (this.queueList == null || this.queueList.size() == 0)) {
            this.queueList = this.producer.getSplitList();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            for (ISplit iSplit : this.queueList) {
                concurrentHashMap.put(iSplit.getQueueId(), iSplit);
            }
            this.queueMap = concurrentHashMap;
        }
        this.isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
    }

    public Object doMessage(IMessage iMessage, AbstractContext abstractContext) {
        if (iMessage.getHeader().isSystemMessage()) {
            doSystemMessage(iMessage, abstractContext);
            return null;
        }
        if (iMessage.getMessageBody().getBooleanValue(WindowCache.IS_COMPRESSION_MSG)) {
            iMessage.setMessageBody(JSONObject.parseObject(CompressUtil.unGzip(iMessage.getMessageBody().getBytes(WindowCache.COMPRESSION_MSG_DATA))));
        }
        if (filterNotOwnerMessage(iMessage)) {
            return null;
        }
        String queueId = iMessage.getHeader().getQueueId();
        JSONArray jSONArray = iMessage.getMessageBody().getJSONArray(SHUFFLE_MESSAGES);
        if (jSONArray == null) {
            return null;
        }
        String string = iMessage.getMessageBody().getString(SHUFFLE_TRACE_ID);
        if (!StringUtil.isEmpty(string)) {
            TraceUtil.debug(string, new String[]{"shuffle message in", "received message size:" + jSONArray.size()});
        }
        Iterator it = jSONArray.iterator();
        while (it.hasNext()) {
            IMessage message = new Message((JSONObject) it.next());
            message.getHeader().setQueueId(queueId);
            message.getMessageBody().put(SHUFFLE_OFFSET, iMessage.getHeader().getOffset());
            this.window.updateMaxEventTime(message);
            if (!isRepeateMessage(message, queueId)) {
                List<WindowInstance> queryOrCreateWindowInstance = this.window.queryOrCreateWindowInstance(message, queueId);
                if (queryOrCreateWindowInstance == null || queryOrCreateWindowInstance.size() == 0) {
                    LOG.warn("the message is out of window instance, the message is discard");
                } else {
                    for (WindowInstance windowInstance : queryOrCreateWindowInstance) {
                        String createWindowInstanceId = windowInstance.createWindowInstanceId();
                        if (windowInstance.isNewWindowInstance().booleanValue()) {
                            this.window.getSqlCache().addCache((ISQLElement) new SQLElement(windowInstance.getSplitId(), createWindowInstanceId, ORMUtil.createBatchReplacetSQL(new Object[]{windowInstance})));
                            windowInstance.setNewWindowInstance(false);
                            ShufflePartitionManager.getInstance().setWindowInstanceFinished(windowInstance.createWindowInstanceId());
                        }
                    }
                    message.getMessageBody().put(WindowInstance.class.getSimpleName(), queryOrCreateWindowInstance);
                    message.getMessageBody().put(AbstractWindow.class.getSimpleName(), this.window);
                    if (DebugWriter.getDebugWriter(this.window.getConfigureName()).isOpenDebug()) {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(message);
                        DebugWriter.getDebugWriter(this.window.getConfigureName()).writeShuffleReceiveBeforeCache(this.window, arrayList, queueId);
                    }
                    Iterator<WindowInstance> it2 = queryOrCreateWindowInstance.iterator();
                    while (it2.hasNext()) {
                        this.window.getWindowFireSource().updateWindowInstanceLastUpdateTime(it2.next());
                    }
                    this.shuffleCache.batchAdd(message);
                }
            }
        }
        if (!this.isWindowTest) {
            return null;
        }
        System.out.println(this.window.getConfigureName() + " receive shuffle msg count is " + this.COUNT.addAndGet(jSONArray.size()));
        return null;
    }

    public void addNewSplit(IMessage iMessage, AbstractContext abstractContext, NewSplitMessage newSplitMessage) {
        this.currentQueueIds = newSplitMessage.getCurrentSplitIds();
        loadSplitProgress(newSplitMessage);
        List<WindowInstance> queryAllWindowInstance = WindowInstance.queryAllWindowInstance(DateUtil.getCurrentTimeString(), this.window, newSplitMessage.getSplitIds());
        if (CollectionUtil.isNotEmpty(queryAllWindowInstance)) {
            new HashMap();
            for (WindowInstance windowInstance : queryAllWindowInstance) {
                windowInstance.setNewWindowInstance(false);
                this.window.registerWindowInstance(windowInstance);
                this.window.getWindowFireSource().registFireWindowInstanceIfNotExist(windowInstance, this.window);
                String splitId = windowInstance.getSplitId();
                this.window.getStorage().loadSplitData2Local(splitId, windowInstance.createWindowInstanceId(), this.window.getWindowBaseValueClass(), new WindowOperator.WindowRowOperator(windowInstance, splitId, this.window));
                this.window.initWindowInstanceMaxSplitNum(windowInstance);
            }
        } else {
            Iterator it = newSplitMessage.getSplitIds().iterator();
            while (it.hasNext()) {
                ShufflePartitionManager.getInstance().setSplitFinished((String) it.next());
            }
        }
        this.window.getFireReceiver().doMessage(iMessage, abstractContext);
    }

    protected void loadSplitProgress(NewSplitMessage newSplitMessage) {
        Iterator it = newSplitMessage.getSplitIds().iterator();
        while (it.hasNext()) {
            Map<String, String> loadOffsets = this.window.getWindowMaxValueManager().loadOffsets(this.window.getConfigureName(), (String) it.next());
            if (loadOffsets != null) {
                this.split2MaxOffsets.putAll(loadOffsets);
            }
        }
    }

    public void removeSplit(IMessage iMessage, AbstractContext abstractContext, RemoveSplitMessage removeSplitMessage) {
        this.currentQueueIds = removeSplitMessage.getCurrentSplitIds();
        Set<String> splitIds = removeSplitMessage.getSplitIds();
        if (splitIds != null) {
            for (String str : splitIds) {
                ShufflePartitionManager.getInstance().setSplitInValidate(str);
                this.window.clearCache(str);
            }
            this.window.getWindowMaxValueManager().removeKeyPrefixFromLocalCache(splitIds);
        }
        this.window.getFireReceiver().doMessage(iMessage, abstractContext);
    }

    public void checkpoint(IMessage iMessage, AbstractContext abstractContext, CheckPointMessage checkPointMessage) {
        if (iMessage.getHeader().isNeedFlush()) {
            flush(iMessage.getHeader().getCheckpointQueueIds());
            this.window.getSqlCache().flush(iMessage.getHeader().getCheckpointQueueIds());
        }
        CheckPointState checkPointState = new CheckPointState();
        checkPointState.setQueueIdAndOffset(this.shuffleCache.getFinishedQueueIdAndOffsets(checkPointMessage));
        checkPointMessage.reply(checkPointState);
    }

    protected void doSystemMessage(IMessage iMessage, AbstractContext abstractContext) {
        ISystemMessage systemMessage = iMessage.getSystemMessage();
        if (systemMessage instanceof CheckPointMessage) {
            checkpoint(iMessage, abstractContext, (CheckPointMessage) systemMessage);
        } else if (systemMessage instanceof NewSplitMessage) {
            addNewSplit(iMessage, abstractContext, (NewSplitMessage) systemMessage);
        } else if (systemMessage instanceof RemoveSplitMessage) {
            removeSplit(iMessage, abstractContext, (RemoveSplitMessage) systemMessage);
        } else {
            if (!(systemMessage instanceof BatchFinishMessage)) {
                throw new RuntimeException("can not support this system message " + systemMessage.getClass().getName());
            }
            System.out.println("start fire window by fininsh flag " + iMessage.getHeader().getQueueId());
            batchMessageFinish(iMessage, abstractContext, (BatchFinishMessage) systemMessage);
        }
        afterFlushCallback(iMessage, abstractContext);
    }

    protected boolean isRepeateMessage(IMessage iMessage, String str) {
        boolean booleanValue = iMessage.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG).booleanValue();
        String string = iMessage.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
        String string2 = iMessage.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
        String str2 = this.split2MaxOffsets.get(MapKeyUtil.createKey(new String[]{this.window.getConfigureName(), str, string}));
        if (str2 == null || new MessageOffset(string2, booleanValue).greateThan(str2)) {
            return false;
        }
        System.out.println("the message offset is old, the message is discard ");
        return true;
    }

    @Override // org.apache.rocketmq.streams.window.shuffle.AbstractSystemChannel
    protected Map<String, String> getChannelConfig() {
        return this.channelConfig;
    }

    @Override // org.apache.rocketmq.streams.window.shuffle.AbstractSystemChannel
    protected void putDynamicPropertyValue(Set<String> set, Properties properties) {
        if (!set.contains("groupName")) {
            properties.put("groupName", getDynamicPropertyValue());
        }
        if (set.contains("tags")) {
            return;
        }
        properties.put("tags", getDynamicPropertyValue());
    }

    @Override // org.apache.rocketmq.streams.window.shuffle.AbstractSystemChannel
    protected String createShuffleTopic(String str, ChainPipeline chainPipeline) {
        return "shuffle_" + str + "_" + chainPipeline.getSource().getNameSpace().replaceAll("\\.", "_") + "_" + chainPipeline.getConfigureName().replaceAll("\\.", "_").replaceAll(";", "_");
    }

    @Override // org.apache.rocketmq.streams.window.shuffle.AbstractSystemChannel
    protected String createShuffleChannelName(ChainPipeline chainPipeline) {
        return getDynamicPropertyValue();
    }

    @Override // org.apache.rocketmq.streams.window.shuffle.AbstractSystemChannel
    protected String createShuffleChannelNameSpace(ChainPipeline chainPipeline) {
        return chainPipeline.getSource().getNameSpace();
    }

    public String getConfigureName() {
        return this.window.getConfigureName() + "_shuffle";
    }

    public String getNameSpace() {
        return this.window.getNameSpace();
    }

    public String getType() {
        return "pipeline";
    }

    public ISplit getSplit(Integer num) {
        return this.queueList.get(num.intValue());
    }

    public JSONObject createMsg(JSONArray jSONArray, ISplit iSplit) {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put(SHUFFLE_QUEUE_ID, iSplit.getQueueId());
        jSONObject.put(SHUFFLE_MESSAGES, jSONArray);
        jSONObject.put(this.MSG_OWNER, getDynamicPropertyValue());
        try {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < jSONArray.size(); i++) {
                JSONObject jSONObject2 = jSONArray.getJSONObject(i);
                arrayList2.add(jSONObject2.getString(WindowCache.SHUFFLE_KEY));
                arrayList.add(jSONObject2.getString(WindowCache.ORIGIN_MESSAGE_TRACE_ID));
            }
            String join = StringUtils.join(new List[]{arrayList});
            String join2 = StringUtils.join(new List[]{arrayList2});
            jSONObject.put(SHUFFLE_TRACE_ID, StringUtils.join(new List[]{arrayList}));
            TraceUtil.debug(join, new String[]{"origin message out", iSplit.getQueueId(), join2, getConfigureName()});
        } catch (Exception e) {
        }
        return jSONObject;
    }

    public JSONArray getMsgs(JSONObject jSONObject) {
        return jSONObject.getJSONArray(SHUFFLE_MESSAGES);
    }

    public ISplit getChannelQueue(String str) {
        return this.queueList.get(hash(str));
    }

    public int hash(Object obj) {
        int size = this.queueList.size();
        int i = 0;
        if (obj != null) {
            i = obj.hashCode() ^ (0 >>> 16);
            if (i < 0) {
                i = -i;
            }
        }
        return i % size;
    }

    public void flush(Set<String> set) {
        this.shuffleCache.flush(set);
    }

    protected void afterFlushCallback(IMessage iMessage, AbstractContext abstractContext) {
    }

    protected void beforeBatchAdd(IMessage iMessage, IMessage iMessage2) {
    }

    protected boolean filterNotOwnerMessage(IMessage iMessage) {
        String string = iMessage.getMessageBody().getString(this.MSG_OWNER);
        return string == null || !string.equals(getDynamicPropertyValue());
    }

    @Override // org.apache.rocketmq.streams.window.shuffle.AbstractSystemChannel
    protected String getDynamicPropertyValue() {
        return MapKeyUtil.createKey(new String[]{this.window.getNameSpace(), this.window.getConfigureName(), this.window.getUpdateFlag() + ""}).replaceAll("\\.", "_").replaceAll(";", "_");
    }

    @Override // org.apache.rocketmq.streams.window.shuffle.AbstractSystemChannel
    protected int getShuffleSplitCount(AbstractSupportShuffleSink abstractSupportShuffleSink) {
        int splitNum = abstractSupportShuffleSink.getSplitNum();
        if (splitNum > 0) {
            return splitNum;
        }
        return 32;
    }

    public Set<String> getCurrentQueueIds() {
        return this.currentQueueIds;
    }

    public List<ISplit> getQueueList() {
        return this.queueList;
    }

    public AbstractShuffleWindow getWindow() {
        return this.window;
    }

    public void batchMessageFinish(IMessage iMessage, AbstractContext abstractContext, BatchFinishMessage batchFinishMessage) {
        if (this.window.supportBatchMsgFinish()) {
            this.shuffleCache.flush(new String[]{iMessage.getHeader().getQueueId()});
            HashSet hashSet = new HashSet();
            hashSet.add(iMessage.getHeader().getQueueId());
            this.window.getSqlCache().flush(hashSet);
            this.window.getWindowFireSource().fireWindowInstance(iMessage.getHeader().getQueueId());
            this.window.getFireReceiver().doMessage(batchFinishMessage.getMsg().copy(), abstractContext);
        }
    }
}
