package org.apache.rocketmq.streams.window.shuffle;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
import org.apache.rocketmq.streams.window.sqlcache.ISQLElement;
import org.apache.rocketmq.streams.window.sqlcache.impl.SplitSQLElement;

/* loaded from: input_file:org/apache/rocketmq/streams/window/shuffle/ShuffleCache.class */
public class ShuffleCache extends WindowCache {
    protected AbstractShuffleWindow window;

    public ShuffleCache(AbstractShuffleWindow abstractShuffleWindow) {
        this.window = abstractShuffleWindow;
    }

    @Override // org.apache.rocketmq.streams.window.model.WindowCache
    protected boolean batchInsert(List<IMessage> list) {
        Map<Pair<String, String>, List<IMessage>> hashMap = new HashMap<>();
        HashMap hashMap2 = new HashMap();
        groupByWindowInstanceAndQueueId(list, hashMap, hashMap2);
        ArrayList<Pair> arrayList = new ArrayList(hashMap.keySet());
        Collections.sort(arrayList);
        for (Pair pair : arrayList) {
            String str = (String) pair.getLeft();
            String str2 = (String) pair.getRight();
            List<IMessage> list2 = hashMap.get(pair);
            WindowInstance windowInstance = hashMap2.get(str2);
            DebugWriter.getDebugWriter(this.window.getConfigureName()).writeShuffleReceive(this.window, list2, windowInstance);
            this.window.shuffleCalculate(list2, windowInstance, str);
            saveSplitProgress(str, list2);
        }
        return true;
    }

    protected void saveSplitProgress(String str, List<IMessage> list) {
        HashMap hashMap = new HashMap();
        Boolean bool = false;
        for (IMessage iMessage : list) {
            bool = iMessage.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG);
            hashMap.put(iMessage.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID), iMessage.getMessageBody().getString(WindowCache.ORIGIN_OFFSET));
        }
        this.window.getSqlCache().addCache((ISQLElement) new SplitSQLElement(str, ORMUtil.createBatchReplacetSQL(new ArrayList(this.window.getWindowMaxValueManager().saveMaxOffset(bool.booleanValue(), this.window.getConfigureName(), str, hashMap).values()))));
    }

    @Override // org.apache.rocketmq.streams.window.model.WindowCache
    protected String generateShuffleKey(IMessage iMessage) {
        return null;
    }

    protected void groupByWindowInstanceAndQueueId(List<IMessage> list, Map<Pair<String, String>, List<IMessage>> map, Map<String, WindowInstance> map2) {
        for (IMessage iMessage : list) {
            String queueId = iMessage.getHeader().getQueueId();
            String string = iMessage.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
            String string2 = iMessage.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
            Boolean bool = iMessage.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG);
            iMessage.getHeader().setQueueId(string);
            iMessage.getHeader().setOffset(string2);
            iMessage.getHeader().setOffsetIsLong(bool);
            for (WindowInstance windowInstance : (List) iMessage.getMessageBody().get(WindowInstance.class.getSimpleName())) {
                String createWindowInstanceId = windowInstance.createWindowInstanceId();
                List<IMessage> computeIfAbsent = map.computeIfAbsent(Pair.of(queueId, createWindowInstanceId), pair -> {
                    return new ArrayList();
                });
                IMessage deepCopy = iMessage.deepCopy();
                deepCopy.getMessageBody().put("HIT_WINDOW_INSTANCE_ID", windowInstance.createWindowInstanceId());
                computeIfAbsent.add(deepCopy);
                map2.put(createWindowInstanceId, windowInstance);
            }
        }
    }
}
