package org.apache.rocketmq.streams.window.operator.impl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.MessageOffset;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator;
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
import org.apache.rocketmq.streams.window.debug.DebugWriter;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
import org.apache.rocketmq.streams.window.sqlcache.ISQLElement;
import org.apache.rocketmq.streams.window.sqlcache.impl.FiredNotifySQLElement;
import org.apache.rocketmq.streams.window.state.WindowBaseValue;
import org.apache.rocketmq.streams.window.state.impl.WindowValue;
import org.apache.rocketmq.streams.window.storage.IWindowStorage;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
import org.apache.rocketmq.streams.window.storage.WindowStorage;

/* loaded from: input_file:org/apache/rocketmq/streams/window/operator/impl/WindowOperator.class */
public class WindowOperator extends AbstractShuffleWindow {
    private static final String ORDER_BY_SPLIT_NUM = "_order_by_split_num_";
    protected transient AtomicInteger shuffleCount = new AtomicInteger(0);
    protected transient AtomicInteger fireCountAccumulator = new AtomicInteger(0);
    protected transient Map<String, Integer> shuffleWindowInstanceId2MsgCount = new HashMap();
    protected transient int windowvaluecount = 0;

    /* loaded from: input_file:org/apache/rocketmq/streams/window/operator/impl/WindowOperator$WindowRowOperator.class */
    public static class WindowRowOperator implements IRowOperator {
        protected WindowInstance windowInstance;
        protected String spiltId;
        protected AbstractWindow window;

        public WindowRowOperator(WindowInstance windowInstance, String str, AbstractWindow abstractWindow) {
            this.windowInstance = windowInstance;
            this.spiltId = str;
            this.window = abstractWindow;
        }

        public synchronized void doProcess(Map<String, Object> map) {
            WindowValue windowValue = (WindowValue) ORMUtil.convert(map, WindowValue.class);
            ArrayList arrayList = new ArrayList();
            String createStoreKey = WindowOperator.createStoreKey(this.spiltId, windowValue.getGroupBy(), this.windowInstance);
            arrayList.add(createStoreKey);
            String createStoreKey2 = WindowOperator.createStoreKey(WindowOperator.getOrderBypPrefix() + windowValue.getPartition(), MapKeyUtil.createKey(new String[]{WindowOperator.getOrderBypFieldName(windowValue), windowValue.getGroupBy()}), this.windowInstance);
            Map<String, T> multiGet = this.window.getStorage().getLocalStorage().multiGet(WindowValue.class, arrayList);
            if (CollectionUtil.isEmpty(multiGet)) {
                HashMap hashMap = new HashMap(4);
                hashMap.put(createStoreKey, windowValue);
                hashMap.put(createStoreKey2, windowValue);
                this.window.getStorage().getLocalStorage().multiPut(hashMap);
                return;
            }
            if (windowValue.getUpdateVersion() > ((WindowValue) multiGet.values().iterator().next()).getUpdateVersion()) {
                HashMap hashMap2 = new HashMap();
                hashMap2.put(createStoreKey, windowValue);
                hashMap2.put(createStoreKey2, windowValue);
                this.window.getStorage().getLocalStorage().multiPut(hashMap2);
            }
        }
    }

    public WindowOperator() {
    }

    @Deprecated
    public WindowOperator(String str, int i) {
        this.timeFieldName = str;
        this.sizeInterval = i;
    }

    @Deprecated
    public WindowOperator(String str, int i, String str2) {
        this.timeFieldName = str;
        this.sizeInterval = i;
    }

    public WindowOperator(int i, String str, Map<String, String> map) {
        this.sizeInterval = i;
        this.slideInterval = i;
        this.groupByFieldName = str;
        setSelectMap(map);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow
    public int fireWindowInstance(WindowInstance windowInstance, String str, Map<String, String> map) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        System.currentTimeMillis();
        int i2 = 0;
        WindowStorage.WindowBaseValueIterator loadWindowInstanceSplitData = this.storage.loadWindowInstanceSplitData(getOrderBypPrefix(), str, windowInstance.createWindowInstanceId(), null, getWindowBaseValueClass());
        if (map != null) {
            String str2 = map.get(str);
            if (StringUtil.isNotEmpty(str2)) {
                loadWindowInstanceSplitData.setPartitionNum(Long.valueOf(str2).longValue());
            }
        }
        while (loadWindowInstanceSplitData.hasNext()) {
            WindowBaseValue windowBaseValue = (WindowBaseValue) loadWindowInstanceSplitData.next();
            if (windowBaseValue != null) {
                this.fireCountAccumulator.addAndGet(getValue((WindowValue) windowBaseValue, "total").intValue());
                arrayList.add((WindowValue) windowBaseValue);
                if (arrayList.size() >= this.windowCache.getBatchSize()) {
                    long currentTimeMillis = System.currentTimeMillis();
                    sendFireMessage(arrayList, str);
                    i2 = (int) (i2 + (System.currentTimeMillis() - currentTimeMillis));
                    i += arrayList.size();
                    arrayList = new ArrayList();
                }
            }
        }
        if (arrayList.size() > 0) {
            long currentTimeMillis2 = System.currentTimeMillis();
            sendFireMessage(arrayList, str);
            i += arrayList.size();
        }
        clearFire(windowInstance);
        this.sqlCache.addCache((ISQLElement) new FiredNotifySQLElement(str, windowInstance.createWindowInstanceId()));
        return i;
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow
    public void shuffleCalculate(List<IMessage> list, WindowInstance windowInstance, String str) {
        DebugWriter.getDebugWriter(getConfigureName()).writeShuffleCalcultateReceveMessage(windowInstance, list, str);
        ArrayList arrayList = new ArrayList();
        Map<String, List<IMessage>> groupByGroupName = groupByGroupName(list, arrayList);
        Set<String> keySet = groupByGroupName.keySet();
        ArrayList arrayList2 = new ArrayList();
        Iterator<String> it = keySet.iterator();
        while (it.hasNext()) {
            arrayList2.add(createStoreKey(str, it.next(), windowInstance));
        }
        HashMap hashMap = new HashMap();
        Map multiGet = this.storage.multiGet(getWindowBaseValueClass(), arrayList2, windowInstance.createWindowInstanceId(), str);
        for (String str2 : arrayList) {
            List<IMessage> list2 = groupByGroupName.get(str2);
            String createStoreKey = createStoreKey(str, str2, windowInstance);
            WindowValue windowValue = (WindowValue) multiGet.get(createStoreKey);
            if (windowValue == null) {
                this.windowvaluecount++;
                windowValue = createWindowValue(str, str2, windowInstance);
            }
            hashMap.put(createStoreKey, windowValue);
            windowValue.incrementUpdateVersion();
            Integer value = getValue(windowValue, "total");
            if (list2 != null) {
                Iterator<IMessage> it2 = list2.iterator();
                while (it2.hasNext()) {
                    calculateWindowValue(windowValue, it2.next());
                }
            }
            Integer value2 = getValue(windowValue, "total");
            this.shuffleCount.addAndGet(-value.intValue());
            this.shuffleCount.addAndGet(value2.intValue());
        }
        if (DebugWriter.getDebugWriter(getConfigureName()).isOpenDebug()) {
            DebugWriter.getDebugWriter(getConfigureName()).writeWindowCalculate(this, new ArrayList(hashMap.values()), str);
        }
        saveStorage(hashMap, windowInstance, str);
    }

    private Integer getValue(WindowValue windowValue, String str) {
        Object computedColumnResultByKey = windowValue.getComputedColumnResultByKey(str);
        if (computedColumnResultByKey == null) {
            return 0;
        }
        if (computedColumnResultByKey instanceof Integer) {
            return (Integer) computedColumnResultByKey;
        }
        if (computedColumnResultByKey instanceof String) {
            return Integer.valueOf((String) computedColumnResultByKey);
        }
        throw new ClassCastException("value:[" + computedColumnResultByKey + "] of fieldName:[" + str + "] can not change to number.");
    }

    protected void saveStorage(Map<String, WindowBaseValue> map, WindowInstance windowInstance, String str) {
        this.storage.multiPut(map, windowInstance.createWindowInstanceId(), str, this.sqlCache);
        HashMap hashMap = new HashMap();
        Iterator<WindowBaseValue> it = map.values().iterator();
        while (it.hasNext()) {
            WindowValue windowValue = (WindowValue) it.next();
            hashMap.put(createStoreKey(getOrderBypPrefix() + str, MapKeyUtil.createKey(new String[]{getOrderBypFieldName(windowValue), windowValue.getGroupBy()}), windowInstance), windowValue);
        }
        this.storage.getLocalStorage().multiPut(hashMap);
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public Class getWindowBaseValueClass() {
        return WindowValue.class;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public Map<String, List<IMessage>> groupByGroupName(List<IMessage> list, List<String> list2) {
        if (list == null || list.size() == 0) {
            return new HashMap();
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (IMessage iMessage : list) {
            String generateShuffleKey = generateShuffleKey(iMessage);
            if (StringUtil.isEmpty(generateShuffleKey)) {
                generateShuffleKey = "<null>";
            }
            List list3 = (List) hashMap.computeIfAbsent(generateShuffleKey, str -> {
                return new ArrayList();
            });
            MessageOffset messageOffset = (MessageOffset) hashMap2.get(generateShuffleKey);
            if (messageOffset == null) {
                messageOffset = iMessage.getHeader().getMessageOffset();
            } else if (messageOffset.greateThan(iMessage.getHeader().getOffset())) {
                messageOffset = iMessage.getHeader().getMessageOffset();
            }
            hashMap2.put(generateShuffleKey, messageOffset);
            list3.add(iMessage);
        }
        ArrayList arrayList = new ArrayList(hashMap2.entrySet());
        arrayList.sort((entry, entry2) -> {
            if (((MessageOffset) entry.getValue()).equals(entry2.getValue())) {
                return 0;
            }
            return ((MessageOffset) entry.getValue()).greateThan(((MessageOffset) entry2.getValue()).getOffsetStr()) ? 1 : -1;
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            list2.add(((Map.Entry) it.next()).getKey());
        }
        return hashMap;
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    protected Long queryWindowInstanceMaxSplitNum(WindowInstance windowInstance) {
        return this.storage.getMaxSplitNum(windowInstance, getWindowBaseValueClass());
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public boolean supportBatchMsgFinish() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void calculateWindowValue(WindowValue windowValue, IMessage iMessage) {
        windowValue.calculate(this, iMessage);
    }

    protected WindowValue createWindowValue(String str, String str2, WindowInstance windowInstance) {
        WindowValue windowValue = new WindowValue();
        windowValue.setStartTime(windowInstance.getStartTime());
        windowValue.setEndTime(windowInstance.getEndTime());
        windowValue.setFireTime(windowInstance.getFireTime());
        windowValue.setGroupBy(str2 == null ? "" : str2);
        windowValue.setMsgKey(StringUtil.createMD5Str(MapKeyUtil.createKey(new String[]{str, windowInstance.createWindowInstanceId(), str2})));
        String queueId = this.shuffleChannel.getChannelQueue(str2).getQueueId();
        windowValue.setPartitionNum(createPartitionNum(windowValue, str, windowInstance));
        windowValue.setPartition(queueId);
        windowValue.setWindowInstancePartitionId(windowInstance.getWindowInstanceKey());
        windowValue.setWindowInstanceId(windowInstance.getWindowInstanceKey());
        return windowValue;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long createPartitionNum(WindowValue windowValue, String str, WindowInstance windowInstance) {
        return incrementAndGetSplitNumber(windowInstance, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String createStoreKey(String str, String str2, WindowInstance windowInstance) {
        return MapKeyUtil.createKey(new String[]{str, windowInstance.createWindowInstanceId(), str2});
    }

    protected static String getOrderBypPrefix() {
        return ORDER_BY_SPLIT_NUM;
    }

    protected static String getOrderBypFieldName(WindowValue windowValue) {
        return windowValue.getPartitionNum() + "";
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public void clearFireWindowInstance(WindowInstance windowInstance) {
        String str = getOrderBypPrefix() + windowInstance.getSplitId();
        if (windowInstance.isCanClearResource()) {
            logoutWindowInstance(windowInstance.createWindowInstanceTriggerId());
            this.windowMaxValueManager.deleteSplitNum(windowInstance, windowInstance.getSplitId());
            ShufflePartitionManager.getInstance().clearWindowInstance(windowInstance.createWindowInstanceId());
            this.storage.delete(windowInstance.createWindowInstanceId(), windowInstance.getSplitId(), getWindowBaseValueClass(), this.sqlCache);
            this.storage.getLocalStorage().delete(windowInstance.createWindowInstanceId(), str, getWindowBaseValueClass());
            if (this.isLocalStorageOnly) {
                return;
            }
            WindowInstance.clearInstance(windowInstance, this.sqlCache);
        }
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow
    public void clearCache(String str) {
        getStorage().clearCache(this.shuffleChannel.getChannelQueue(str), getWindowBaseValueClass());
        getStorage().clearCache(getOrderByQueue(str, getOrderBypPrefix()), getWindowBaseValueClass());
        ShufflePartitionManager.getInstance().clearSplit(str);
    }

    public ISplit getOrderByQueue(String str, final String str2) {
        final ISplit iSplit = this.shuffleChannel.getQueueList().get(this.shuffleChannel.hash(str));
        return new ISplit() { // from class: org.apache.rocketmq.streams.window.operator.impl.WindowOperator.1
            public String getQueueId() {
                return str2 + iSplit.getQueueId();
            }

            public Object getQueue() {
                return iSplit.getQueue();
            }

            public int compareTo(Object obj) {
                return iSplit.compareTo(obj);
            }

            public String toJson() {
                return iSplit.toJson();
            }

            public void toObject(String str3) {
                iSplit.toObject(str3);
            }
        };
    }

    public static void compareAndSet(WindowInstance windowInstance, IWindowStorage iWindowStorage, List<WindowValue> list) {
        if (list == null || iWindowStorage == null) {
            return;
        }
        synchronized (iWindowStorage) {
            ArrayList arrayList = new ArrayList();
            HashMap hashMap = new HashMap();
            for (WindowValue windowValue : list) {
                String createStoreKey = createStoreKey(windowValue.getPartition(), windowValue.getGroupBy(), windowInstance);
                arrayList.add(createStoreKey);
                hashMap.put(createStoreKey, windowValue);
                hashMap.put(createStoreKey(windowValue.getPartition(), windowValue.getPartitionNum() + "", windowInstance), windowValue);
            }
            Map<String, T> multiGet = iWindowStorage.multiGet(WindowValue.class, arrayList);
            if (multiGet == 0 || multiGet.size() == 0) {
                iWindowStorage.multiPut(hashMap);
                return;
            }
            for (Map.Entry entry : multiGet.entrySet()) {
                String str = (String) entry.getKey();
                if (((WindowValue) hashMap.get(str)).getUpdateVersion() <= ((WindowBaseValue) entry.getValue()).getUpdateVersion()) {
                    hashMap.remove(str);
                }
            }
            if (CollectionUtil.isNotEmpty(hashMap)) {
                iWindowStorage.multiPut(hashMap);
            }
        }
    }
}
