package org.apache.rocketmq.streams.window.operator.join;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.component.ComponentCreator;
import org.apache.rocketmq.streams.common.context.Context;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.context.MessageHeader;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.dim.model.AbstractDim;
import org.apache.rocketmq.streams.window.model.WindowCache;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow;
import org.apache.rocketmq.streams.window.shuffle.ShuffleChannel;
import org.apache.rocketmq.streams.window.state.WindowBaseValue;
import org.apache.rocketmq.streams.window.state.impl.JoinLeftState;
import org.apache.rocketmq.streams.window.state.impl.JoinRightState;
import org.apache.rocketmq.streams.window.state.impl.JoinState;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;

/* loaded from: input_file:org/apache/rocketmq/streams/window/operator/join/JoinWindow.class */
public class JoinWindow extends AbstractShuffleWindow {
    public static final String JOIN_KEY = "JOIN_KEY";
    public static final String LABEL_LEFT = "left";
    public static final String LABEL_RIGHT = "right";
    protected List<String> leftJoinFieldNames;
    protected List<String> rightJoinFieldNames;
    protected String rightAsName;
    protected String joinType;
    protected String expression;
    protected int retainWindowCount = 4;
    protected transient DBOperator joinOperator = new DBOperator();
    protected transient AtomicInteger count = new AtomicInteger(0);

    @Override // org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow
    protected int fireWindowInstance(WindowInstance windowInstance, String str, Map<String, String> map) {
        clearFire(windowInstance);
        return 0;
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow
    public void clearCache(String str) {
        getStorage().clearCache(this.shuffleChannel.getChannelQueue(str), getWindowBaseValueClass());
        ShufflePartitionManager.getInstance().clearSplit(str);
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow
    public void shuffleCalculate(List<IMessage> list, WindowInstance windowInstance, String str) {
        for (IMessage iMessage : list) {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            MessageHeader messageHeader = (MessageHeader) JSONObject.parseObject(iMessage.getMessageBody().getString(WindowCache.ORIGIN_MESSAGE_HEADER), MessageHeader.class);
            iMessage.setHeader(messageHeader);
            String msgRouteFromLable = messageHeader.getMsgRouteFromLable();
            String createStoreKey = createStoreKey(iMessage, msgRouteFromLable, windowInstance);
            JoinState createJoinState = createJoinState(iMessage, windowInstance, msgRouteFromLable);
            if (LABEL_LEFT.equalsIgnoreCase(msgRouteFromLable)) {
                hashMap.put(createStoreKey, createJoinState);
            } else if (LABEL_RIGHT.equalsIgnoreCase(msgRouteFromLable)) {
                hashMap2.put(createStoreKey, createJoinState);
            }
            if (hashMap.size() > 0) {
                this.storage.multiPut(hashMap);
            }
            if (hashMap2.size() > 0) {
                this.storage.multiPut(hashMap2);
            }
            String msgRouteFromLable2 = iMessage.getHeader().getMsgRouteFromLable();
            Iterator<WindowBaseValue> it = null;
            if (LABEL_LEFT.equalsIgnoreCase(msgRouteFromLable2)) {
                it = getMessageIterator(str, windowInstance, iMessage, createStoreKeyPrefix(iMessage, LABEL_RIGHT, windowInstance), JoinRightState.class);
            } else if (LABEL_RIGHT.equalsIgnoreCase(msgRouteFromLable2)) {
                it = getMessageIterator(str, windowInstance, iMessage, createStoreKeyPrefix(iMessage, LABEL_LEFT, windowInstance), JoinLeftState.class);
            }
            ArrayList arrayList = new ArrayList();
            int i = 0;
            while (it.hasNext()) {
                WindowBaseValue next = it.next();
                if (next != null) {
                    arrayList.add(next);
                    i++;
                    if (i == 100) {
                        sendMessage(iMessage, arrayList);
                        arrayList.clear();
                        i = 0;
                    }
                }
            }
            sendMessage(iMessage, arrayList);
        }
    }

    private Iterator<WindowBaseValue> getMessageIterator(String str, WindowInstance windowInstance, IMessage iMessage, final String str2, final Class<? extends WindowBaseValue> cls) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, WindowInstance> entry : this.windowInstanceMap.entrySet()) {
            if (str.equalsIgnoreCase(entry.getValue().getSplitId())) {
                arrayList.add(entry.getValue());
            }
        }
        final Iterator it = arrayList.iterator();
        return new Iterator<WindowBaseValue>() { // from class: org.apache.rocketmq.streams.window.operator.join.JoinWindow.1
            private Iterator<WindowBaseValue> iterator = null;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.iterator != null && this.iterator.hasNext()) {
                    return true;
                }
                if (!it.hasNext()) {
                    return false;
                }
                this.iterator = JoinWindow.this.storage.loadWindowInstanceSplitData(null, null, ((WindowInstance) it.next()).createWindowInstanceId(), str2, cls);
                return this.iterator != null && this.iterator.hasNext();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public WindowBaseValue next() {
                return this.iterator.next();
            }
        };
    }

    private Iterator<WindowBaseValue> getIterator(final String str, final String str2, WindowInstance windowInstance, final Class<? extends WindowBaseValue> cls) {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, WindowInstance>> it = this.windowInstanceMap.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue());
        }
        final Iterator it2 = arrayList.iterator();
        return new Iterator<WindowBaseValue>() { // from class: org.apache.rocketmq.streams.window.operator.join.JoinWindow.2
            private Iterator<WindowBaseValue> iterator = null;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.iterator != null && this.iterator.hasNext()) {
                    return true;
                }
                if (!it2.hasNext()) {
                    return false;
                }
                this.iterator = JoinWindow.this.storage.loadWindowInstanceSplitData(null, str, ((WindowInstance) it2.next()).createWindowInstanceId(), str2, cls);
                return this.iterator != null && this.iterator.hasNext();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public WindowBaseValue next() {
                return this.iterator.next();
            }
        };
    }

    public List<JSONObject> connectJoin(IMessage iMessage, List<Map<String, Object>> list, String str, String str2) {
        List<JSONObject> arrayList = new ArrayList();
        if ("inner".equalsIgnoreCase(str)) {
            if (list.size() <= 0) {
                return arrayList;
            }
            arrayList = connectInnerJoin(iMessage, list, str2);
        } else if (LABEL_LEFT.equalsIgnoreCase(str)) {
            arrayList = connectLeftJoin(iMessage, list, str2);
        }
        return arrayList;
    }

    private List<JSONObject> connectLeftJoin(IMessage iMessage, List<Map<String, Object>> list, String str) {
        ArrayList arrayList = new ArrayList();
        String msgRouteFromLable = iMessage.getHeader().getMsgRouteFromLable();
        JSONObject messageBody = iMessage.getMessageBody();
        String traceId = iMessage.getHeader().getTraceId();
        int i = 1;
        if (LABEL_LEFT.equalsIgnoreCase(msgRouteFromLable) && list.size() > 0) {
            for (Map<String, Object> map : list) {
                JSONObject jSONObject = (JSONObject) messageBody.clone();
                jSONObject.fluentPutAll(addAsName(map, str));
                jSONObject.put("traceId", traceId + "-" + i);
                i++;
                arrayList.add(jSONObject);
            }
        } else if (LABEL_LEFT.equalsIgnoreCase(msgRouteFromLable) && list.size() <= 0) {
            JSONObject jSONObject2 = (JSONObject) messageBody.clone();
            jSONObject2.put("traceId", traceId + "-1");
            arrayList.add(jSONObject2);
        } else if (LABEL_RIGHT.equalsIgnoreCase(msgRouteFromLable) && list.size() > 0) {
            JSONObject addAsName = addAsName(messageBody, str);
            for (Map<String, Object> map2 : list) {
                JSONObject jSONObject3 = (JSONObject) addAsName.clone();
                jSONObject3.fluentPutAll(map2);
                jSONObject3.put("traceId", traceId + "-" + i);
                i++;
                arrayList.add(jSONObject3);
            }
        }
        return arrayList;
    }

    public List<JSONObject> connectInnerJoin(IMessage iMessage, List<Map<String, Object>> list, String str) {
        ArrayList arrayList = new ArrayList();
        String msgRouteFromLable = iMessage.getHeader().getMsgRouteFromLable();
        String traceId = iMessage.getHeader().getTraceId();
        int i = 1;
        if (LABEL_LEFT.equalsIgnoreCase(msgRouteFromLable)) {
            JSONObject messageBody = iMessage.getMessageBody();
            for (Map<String, Object> map : list) {
                JSONObject jSONObject = (JSONObject) messageBody.clone();
                jSONObject.fluentPutAll(addAsName(map, str));
                jSONObject.put("traceId", traceId + "-" + i);
                i++;
                arrayList.add(jSONObject);
            }
        } else {
            JSONObject addAsName = addAsName(iMessage.getMessageBody(), str);
            for (Map<String, Object> map2 : list) {
                JSONObject jSONObject2 = (JSONObject) addAsName.clone();
                jSONObject2.fluentPutAll(map2);
                jSONObject2.put("traceId", traceId + "-" + i);
                i++;
                arrayList.add(jSONObject2);
            }
        }
        return arrayList;
    }

    private JSONObject addAsName(Map<String, Object> map, String str) {
        JSONObject jSONObject = new JSONObject();
        if (StringUtil.isEmpty(str)) {
            return jSONObject.fluentPutAll(map);
        }
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            jSONObject.put(str + "." + entry.getKey(), entry.getValue());
        }
        return jSONObject;
    }

    protected String createStoreKey(IMessage iMessage, String str, WindowInstance windowInstance) {
        return MapKeyUtil.createKey(new String[]{windowInstance.createWindowInstanceId(), iMessage.getMessageBody().getString(WindowCache.SHUFFLE_KEY), str, iMessage.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID), iMessage.getMessageBody().getString(WindowCache.ORIGIN_OFFSET)});
    }

    protected String createStoreKeyPrefix(IMessage iMessage, String str, WindowInstance windowInstance) {
        return MapKeyUtil.createKey(new String[]{iMessage.getMessageBody().getString(WindowCache.SHUFFLE_KEY), str});
    }

    private JoinState createJoinState(IMessage iMessage, WindowInstance windowInstance, String str) {
        MessageHeader header = iMessage.getHeader();
        String str2 = System.nanoTime() + "";
        String queueId = header.getQueueId() != null ? header.getQueueId() : "_Dipper";
        if (header.isEmptyOffset()) {
            header.setOffset(str2);
            str2 = header.getOffset();
        }
        String str3 = getNameSpace() + "_" + getConfigureName() + "_" + queueId + "_" + str2;
        String generateKey = generateKey(iMessage.getMessageBody(), str, this.leftJoinFieldNames, this.rightJoinFieldNames);
        JSONObject jSONObject = (JSONObject) iMessage.getMessageBody().clone();
        jSONObject.remove("WindowInstance");
        jSONObject.remove("AbstractWindow");
        jSONObject.remove(WindowCache.ORIGIN_MESSAGE_HEADER);
        jSONObject.remove("MessageHeader");
        JoinState joinState = null;
        if (LABEL_LEFT.equalsIgnoreCase(str)) {
            joinState = new JoinLeftState();
        } else if (LABEL_RIGHT.equalsIgnoreCase(str)) {
            joinState = new JoinRightState();
        }
        joinState.setGmtCreate(new Date());
        joinState.setGmtModified(new Date());
        joinState.setWindowName(getConfigureName());
        joinState.setWindowNameSpace(getNameSpace());
        joinState.setMessageId(str3);
        joinState.setMessageKey(generateKey);
        joinState.setMessageTime(new Date());
        joinState.setMessageBody(jSONObject.toJSONString());
        joinState.setMsgKey(createStoreKey(iMessage, str, windowInstance));
        String queueId2 = this.shuffleChannel.getChannelQueue(iMessage.getMessageBody().getString(WindowCache.SHUFFLE_KEY)).getQueueId();
        joinState.setPartition(queueId2);
        joinState.setWindowInstanceId(windowInstance.getWindowInstanceKey());
        joinState.setPartitionNum(incrementAndGetSplitNumber(windowInstance, queueId2));
        joinState.setWindowInstancePartitionId(windowInstance.getWindowInstanceKey());
        return joinState;
    }

    public static String generateKey(JSONObject jSONObject, String str, List<String> list, List<String> list2) {
        StringBuffer stringBuffer = new StringBuffer();
        if (LABEL_LEFT.equalsIgnoreCase(str)) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                stringBuffer.append(jSONObject.getString(it.next())).append("_");
            }
        } else {
            for (String str2 : list2) {
                String[] split = str2.split("\\.");
                if (split.length > 1) {
                    str2 = split[1];
                }
                stringBuffer.append(jSONObject.getString(str2)).append("_");
            }
        }
        return StringUtil.createMD5Str(stringBuffer.toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public String generateShuffleKey(IMessage iMessage) {
        return generateKey(iMessage.getMessageBody(), iMessage.getHeader().getMsgRouteFromLable(), this.leftJoinFieldNames, this.rightJoinFieldNames);
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public Class getWindowBaseValueClass() {
        return JoinState.class;
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public synchronized void clearFireWindowInstance(WindowInstance windowInstance) {
        ArrayList<WindowInstance> arrayList = new ArrayList();
        Date addSecond = DateUtil.addSecond(DateUtil.parse(windowInstance.getStartTime()), (-this.sizeInterval) * (this.retainWindowCount - 1) * 60);
        Iterator it = this.windowInstanceMap.keySet().iterator();
        while (it.hasNext()) {
            WindowInstance windowInstance2 = this.windowInstanceMap.get(it.next());
            if (DateUtil.dateDiff(addSecond, DateUtil.parse(windowInstance2.getStartTime())) >= 0) {
                arrayList.add(windowInstance2);
                it.remove();
            }
        }
        for (WindowInstance windowInstance3 : arrayList) {
            this.windowMaxValueManager.deleteSplitNum(windowInstance3, windowInstance3.getSplitId());
            ShufflePartitionManager.getInstance().clearWindowInstance(windowInstance3.createWindowInstanceId());
            this.storage.delete(windowInstance3.createWindowInstanceId(), null, WindowBaseValue.class, this.sqlCache);
            if (!this.isLocalStorageOnly) {
                WindowInstance.clearInstance(windowInstance3, this.sqlCache);
                this.joinOperator.cleanMessage(windowInstance3.getWindowNameSpace(), windowInstance3.getWindowName(), getRetainWindowCount(), getSizeInterval(), windowInstance.getStartTime());
            }
        }
    }

    protected List<Map<String, Object>> matchRows(JSONObject jSONObject, List<Map<String, Object>> list) {
        return AbstractDim.matchExpressionByLoop(list.iterator(), this.expression, jSONObject, true);
    }

    private List<Map<String, Object>> converToMapFromList(List<WindowBaseValue> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<WindowBaseValue> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Message.parseObject(((JoinState) it.next()).getMessageBody()));
        }
        return arrayList;
    }

    protected void sendMessage(JSONObject jSONObject, boolean z) {
        Message message = new Message(jSONObject);
        cleanMessage(message);
        if (z) {
            message.getHeader().setNeedFlush(true);
        }
        Context context = new Context(message);
        if (ComponentCreator.getPropertyBooleanValue("window.fire.isTest")) {
            System.out.println(getConfigureName() + " result send count is " + this.count.incrementAndGet());
        }
        getFireReceiver().doMessage(message, context);
    }

    protected void sendMessage(IMessage iMessage, List<WindowBaseValue> list) {
        if ("inner".equalsIgnoreCase(this.joinType) && (list == null || list.size() == 0)) {
            return;
        }
        List<JSONObject> connectJoin = this.expression == null ? connectJoin(iMessage, converToMapFromList(list), this.joinType, this.rightAsName) : connectJoin(iMessage, matchRows(iMessage.getMessageBody(), converToMapFromList(list)), this.joinType, this.rightAsName);
        for (int i = 0; i < connectJoin.size(); i++) {
            if (i == connectJoin.size() - 1) {
                sendMessage(connectJoin.get(i), true);
            } else {
                sendMessage(connectJoin.get(i), false);
            }
        }
    }

    protected void cleanMessage(Message message) {
        JSONObject messageBody = message.getMessageBody();
        messageBody.remove("WindowInstance");
        messageBody.remove("AbstractWindow");
        messageBody.remove(WindowCache.ORIGIN_MESSAGE_HEADER);
        messageBody.remove("MessageHeader");
        messageBody.remove(ShuffleChannel.SHUFFLE_OFFSET);
        messageBody.remove("HIT_WINDOW_INSTANCE_ID");
        messageBody.remove("traceId");
        messageBody.remove(WindowCache.ORIGIN_QUEUE_ID);
        messageBody.remove(WindowCache.SHUFFLE_KEY);
        messageBody.remove(WindowCache.ORIGIN_MESSAGE_TRACE_ID);
        messageBody.remove(WindowCache.ORIGIN_OFFSET);
        messageBody.remove(WindowCache.ORIGIN_QUEUE_IS_LONG);
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public void removeInstanceFromMap(WindowInstance windowInstance) {
        String createKey = MapKeyUtil.createKey(new String[]{windowInstance.getWindowNameSpace(), windowInstance.getWindowName(), DateUtil.getBeforeMinutesTime(windowInstance.getStartTime(), (this.retainWindowCount - 1) * this.sizeInterval)});
        for (Map.Entry<String, WindowInstance> entry : this.windowInstanceMap.entrySet()) {
            if (entry.getKey().compareToIgnoreCase(createKey) <= 0) {
                this.windowInstanceMap.remove(entry);
            }
        }
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    protected Long queryWindowInstanceMaxSplitNum(WindowInstance windowInstance) {
        Long maxSplitNum = this.storage.getMaxSplitNum(windowInstance, JoinLeftState.class);
        Long maxSplitNum2 = this.storage.getMaxSplitNum(windowInstance, JoinRightState.class);
        if (maxSplitNum == null) {
            return maxSplitNum2;
        }
        if (maxSplitNum2 != null && maxSplitNum.longValue() < maxSplitNum2.longValue()) {
            if (maxSplitNum.longValue() < maxSplitNum2.longValue()) {
                return maxSplitNum2;
            }
            return null;
        }
        return maxSplitNum;
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public boolean supportBatchMsgFinish() {
        return false;
    }

    public int getRetainWindowCount() {
        return this.retainWindowCount;
    }

    public void setRetainWindowCount(int i) {
        this.retainWindowCount = i;
    }

    public List<String> getLeftJoinFieldNames() {
        return this.leftJoinFieldNames;
    }

    public void setLeftJoinFieldNames(List<String> list) {
        this.leftJoinFieldNames = list;
    }

    public List<String> getRightJoinFieldNames() {
        return this.rightJoinFieldNames;
    }

    public void setRightJoinFieldNames(List<String> list) {
        this.rightJoinFieldNames = list;
    }

    public String getRightAsName() {
        return this.rightAsName;
    }

    public void setRightAsName(String str) {
        this.rightAsName = str;
    }

    public String getJoinType() {
        return this.joinType;
    }

    public void setJoinType(String str) {
        this.joinType = str;
    }

    public String getExpression() {
        return this.expression;
    }

    public void setExpression(String str) {
        this.expression = str;
    }
}
