package org.apache.rocketmq.streams.window.operator.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.utils.Base64Utils;
import org.apache.rocketmq.streams.common.utils.CollectionUtil;
import org.apache.rocketmq.streams.common.utils.DateUtil;
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.window.model.WindowInstance;
import org.apache.rocketmq.streams.window.state.impl.WindowValue;
import org.apache.rocketmq.streams.window.storage.WindowStorage;

/* loaded from: input_file:org/apache/rocketmq/streams/window/operator/impl/SessionOperator.class */
public class SessionOperator extends WindowOperator {
    protected static final Log LOG;
    public static final String SESSION_WINDOW_BEGIN_TIME = "1970-01-01";
    public static final String SESSION_WINDOW_END_TIME = "9999-01-01";
    private static final String SESSION_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
    private static final String ORDER_BY_FIRE_TIME_PREFIX = "_order_by_fire_time_";
    protected int sessionTimeOut;
    private transient Object lock;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SessionOperator() {
        this.sessionTimeOut = 600;
        this.lock = new Object();
    }

    public SessionOperator(Integer num) {
        this.sessionTimeOut = 600;
        this.lock = new Object();
        this.sessionTimeOut = ((Integer) Optional.ofNullable(num).orElse(Integer.valueOf(this.sessionTimeOut))).intValue();
    }

    public int getSessionTimeOut() {
        return this.sessionTimeOut;
    }

    public void setSessionTimeOut(int i) {
        this.sessionTimeOut = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow, org.apache.rocketmq.streams.window.operator.AbstractWindow
    public boolean initConfigurable() {
        return super.initConfigurable();
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public List<WindowInstance> queryOrCreateWindowInstance(IMessage iMessage, String str) {
        WindowInstance createWindowInstance = createWindowInstance(SESSION_WINDOW_BEGIN_TIME, SESSION_WINDOW_END_TIME, null, str);
        final String createWindowInstanceId = createWindowInstance.createWindowInstanceId();
        if (searchWindowInstance(createWindowInstanceId) == null) {
            Date addDate = DateUtil.addDate(TimeUnit.SECONDS, (Date) getSessionTime(iMessage).getRight(), this.waterMarkMinute * this.timeUnitAdjust);
            Long maxEventTime = getMaxEventTime(str);
            if (maxEventTime == null) {
                LOG.warn("use current time as max event time!");
                maxEventTime = Long.valueOf(System.currentTimeMillis());
            }
            if (addDate.getTime() <= maxEventTime.longValue()) {
                LOG.warn("message is discarded as out of date! fire time: " + addDate.getTime() + " max event time: " + maxEventTime);
                return new ArrayList();
            }
            createWindowInstance.setFireTime(DateUtil.format(addDate, SESSION_DATETIME_PATTERN));
            registerWindowInstance(createWindowInstance);
        }
        return new ArrayList<WindowInstance>() { // from class: org.apache.rocketmq.streams.window.operator.impl.SessionOperator.1
            {
                add(SessionOperator.this.searchWindowInstance(createWindowInstanceId));
            }
        };
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public WindowInstance registerWindowInstance(WindowInstance windowInstance) {
        return super.registerWindowInstance(windowInstance.createWindowInstanceId(), windowInstance);
    }

    @Override // org.apache.rocketmq.streams.window.operator.impl.WindowOperator, org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow
    public void shuffleCalculate(List<IMessage> list, WindowInstance windowInstance, String str) {
        synchronized (this.lock) {
            ArrayList arrayList = new ArrayList();
            Map<String, List<IMessage>> groupByGroupName = groupByGroupName(list, arrayList);
            int size = arrayList.size();
            HashMap hashMap = new HashMap(size);
            for (String str2 : arrayList) {
                hashMap.put(str2, createStoreKey(str, str2, windowInstance));
            }
            Map multiGetList = this.storage.multiGetList(WindowValue.class, new ArrayList(hashMap.values()));
            HashMap hashMap2 = new HashMap(size);
            for (Map.Entry<String, List<IMessage>> entry : groupByGroupName.entrySet()) {
                String key = entry.getKey();
                String str3 = (String) hashMap.get(key);
                List<IMessage> value = entry.getValue();
                HashMap hashMap3 = new HashMap(value.size());
                List<WindowValue> list2 = (List) multiGetList.getOrDefault(str3, new ArrayList());
                for (WindowValue windowValue : list2) {
                    hashMap3.put(Long.valueOf(windowValue.getPartitionNum()), windowValue);
                }
                for (IMessage iMessage : value) {
                    WindowValue queryOrCreateWindowValue = queryOrCreateWindowValue(windowInstance, str, key, iMessage, list2, str3);
                    queryOrCreateWindowValue.calculate(this, iMessage);
                    String string = iMessage.getMessageBody().getString("SHUFFLE_TRACE_ID");
                    if (!StringUtil.isEmpty(string)) {
                        try {
                            TraceUtil.debug(string, new String[]{"shuffle message out " + key, String.valueOf(queryOrCreateWindowValue.getPartitionNum()), queryOrCreateWindowValue.getStartTime(), queryOrCreateWindowValue.getEndTime(), new String(Base64Utils.decode(queryOrCreateWindowValue.getComputedColumnResult()), "UTF-8")});
                        } catch (Exception e) {
                        }
                    }
                    hashMap3.put(Long.valueOf(queryOrCreateWindowValue.getPartitionNum()), queryOrCreateWindowValue);
                }
                hashMap2.put(str3, mergeWindowValue(new ArrayList(hashMap3.values()), windowInstance, str));
            }
            store(hashMap2, windowInstance, str);
        }
    }

    private WindowValue queryOrCreateWindowValue(WindowInstance windowInstance, String str, String str2, IMessage iMessage, List<WindowValue> list, String str3) {
        ArrayList arrayList = new ArrayList();
        for (WindowValue windowValue : list) {
            Date parseTime = DateUtil.parseTime(windowValue.getStartTime());
            Date parseTime2 = DateUtil.parseTime(windowValue.getEndTime());
            Pair<Date, Date> sessionTime = getSessionTime(iMessage);
            Date date = (Date) sessionTime.getLeft();
            Date date2 = (Date) sessionTime.getRight();
            if (date.compareTo(parseTime) < 0 && date2.compareTo(parseTime2) <= 0) {
                windowValue.setStartTime(DateUtil.format(date, SESSION_DATETIME_PATTERN));
                return windowValue;
            }
            if (date.compareTo(parseTime) >= 0 && date2.compareTo(parseTime2) <= 0) {
                return windowValue;
            }
            if (date.compareTo(parseTime) >= 0 && date.compareTo(parseTime2) < 0 && date2.compareTo(parseTime2) > 0) {
                arrayList.add(createPrefixKey(windowValue, windowInstance, str));
                deletePrefixValue(arrayList);
                Date addDate = DateUtil.addDate(TimeUnit.SECONDS, date2, this.waterMarkMinute * this.timeUnitAdjust);
                windowValue.setEndTime(DateUtil.format(date2, SESSION_DATETIME_PATTERN));
                windowValue.setFireTime(DateUtil.format(addDate, SESSION_DATETIME_PATTERN));
                return windowValue;
            }
            if (date.compareTo(parseTime) < 0 && date2.compareTo(parseTime2) > 0) {
                arrayList.add(createPrefixKey(windowValue, windowInstance, str));
                deletePrefixValue(arrayList);
                windowValue.setStartTime(DateUtil.format(date, SESSION_DATETIME_PATTERN));
                windowValue.setEndTime(DateUtil.format(date2, SESSION_DATETIME_PATTERN));
                windowValue.setFireTime(DateUtil.format(DateUtil.addDate(TimeUnit.SECONDS, date2, this.waterMarkMinute * this.timeUnitAdjust), SESSION_DATETIME_PATTERN));
                return windowValue;
            }
        }
        WindowValue createWindowValue = createWindowValue(str, str2, windowInstance, iMessage, str3);
        list.add(createWindowValue);
        return createWindowValue;
    }

    private List<WindowValue> mergeWindowValue(List<WindowValue> list, WindowInstance windowInstance, String str) {
        if (list.size() <= 1) {
            return list;
        }
        HashMap hashMap = new HashMap(list.size());
        HashMap hashMap2 = new HashMap(list.size());
        Collections.sort(list, Comparator.comparing((v0) -> {
            return v0.getStartTime();
        }));
        for (int i = 0; i < list.size(); i++) {
            if (!hashMap.containsKey(Integer.valueOf(i))) {
                final int i2 = i;
                hashMap2.put(Integer.valueOf(i), new ArrayList<Integer>() { // from class: org.apache.rocketmq.streams.window.operator.impl.SessionOperator.2
                    {
                        add(Integer.valueOf(i2));
                    }
                });
                WindowValue windowValue = list.get(i);
                for (int i3 = i + 1; i3 < list.size(); i3++) {
                    WindowValue windowValue2 = list.get(i3);
                    if (windowValue2.getStartTime().compareTo(windowValue.getEndTime()) <= 0) {
                        hashMap.put(Integer.valueOf(i3), Integer.valueOf(i));
                        windowValue.setEndTime(windowValue.getEndTime().compareTo(windowValue2.getEndTime()) <= 0 ? windowValue2.getEndTime() : windowValue.getEndTime());
                        windowValue.setFireTime(windowValue.getFireTime().compareTo(windowValue2.getFireTime()) <= 0 ? windowValue2.getFireTime() : windowValue.getFireTime());
                        ((List) hashMap2.get(Integer.valueOf(i))).add(Integer.valueOf(i3));
                    }
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : hashMap2.entrySet()) {
            WindowValue windowValue3 = list.get(((Integer) entry.getKey()).intValue());
            WindowValue mergeWindowValue = WindowValue.mergeWindowValue(this, (List) ((List) entry.getValue()).stream().map(num -> {
                return (WindowValue) list.get(num.intValue());
            }).collect(Collectors.toList()));
            windowValue3.setComputedColumnResult(mergeWindowValue.getComputedColumnResult());
            windowValue3.setAggColumnResult(mergeWindowValue.getAggColumnResult());
            arrayList.add(windowValue3);
        }
        deletePrefixValue((List) hashMap.keySet().stream().map(num2 -> {
            return createPrefixKey((WindowValue) list.get(num2.intValue()), windowInstance, str);
        }).collect(Collectors.toList()));
        return arrayList;
    }

    private void deletePrefixValue(List<String> list) {
        if (CollectionUtil.isEmpty(list)) {
            return;
        }
        this.storage.getLocalStorage().removeKeys(list);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String createPrefixKey(WindowValue windowValue, WindowInstance windowInstance, String str) {
        return MapKeyUtil.createKey(new String[]{getOrderBypPrefix() + str, windowInstance.createWindowInstanceId(), windowValue.getFireTime(), String.valueOf(windowValue.getPartitionNum()), windowValue.getGroupBy()});
    }

    private Pair<Date, Date> getSessionTime(IMessage iMessage) {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        try {
            valueOf = WindowInstance.getOccurTime(this, iMessage);
        } catch (Exception e) {
            LOG.error("failed in computing occur time from the message!", e);
        }
        Date date = new Date(valueOf.longValue());
        return Pair.of(date, DateUtil.addDate(TimeUnit.SECONDS, date, this.sessionTimeOut));
    }

    protected void store(Map<String, List<WindowValue>> map, WindowInstance windowInstance, String str) {
        if (CollectionUtil.isEmpty(map)) {
            return;
        }
        this.storage.multiPutList(map, windowInstance.createWindowInstanceId(), str, this.sqlCache);
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<String, List<WindowValue>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            for (WindowValue windowValue : it.next().getValue()) {
                hashMap.put(createPrefixKey(windowValue, windowInstance, str), windowValue);
            }
        }
        this.storage.getLocalStorage().multiPut(hashMap);
    }

    protected WindowValue createWindowValue(String str, String str2, WindowInstance windowInstance, IMessage iMessage, String str3) {
        WindowValue windowValue = new WindowValue();
        Pair<Date, Date> sessionTime = getSessionTime(iMessage);
        String format = DateUtil.format((Date) sessionTime.getLeft(), SESSION_DATETIME_PATTERN);
        String format2 = DateUtil.format((Date) sessionTime.getRight(), SESSION_DATETIME_PATTERN);
        String format3 = DateUtil.format(DateUtil.addDate(TimeUnit.SECONDS, (Date) sessionTime.getRight(), this.waterMarkMinute * this.timeUnitAdjust), SESSION_DATETIME_PATTERN);
        windowValue.setStartTime(format);
        windowValue.setEndTime(format2);
        windowValue.setFireTime(format3);
        windowValue.setGroupBy(str2);
        windowValue.setMsgKey(StringUtil.createMD5Str(str3));
        String queueId = this.shuffleChannel.getChannelQueue(str2).getQueueId();
        if (!$assertionsDisabled && !queueId.equalsIgnoreCase(str)) {
            throw new AssertionError();
        }
        windowValue.setPartitionNum(createPartitionNum(windowValue, str, windowInstance));
        windowValue.setPartition(queueId);
        windowValue.setWindowInstancePartitionId(windowInstance.getWindowInstanceKey());
        windowValue.setWindowInstanceId(windowInstance.getWindowInstanceKey());
        return windowValue;
    }

    protected static String getOrderBypPrefix() {
        return ORDER_BY_FIRE_TIME_PREFIX;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.rocketmq.streams.window.operator.impl.WindowOperator, org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow
    public int fireWindowInstance(WindowInstance windowInstance, String str, Map<String, String> map) {
        int size;
        synchronized (this.lock) {
            WindowStorage.WindowBaseValueIterator loadWindowInstanceSplitData = this.storage.loadWindowInstanceSplitData(getOrderBypPrefix(), str, windowInstance.createWindowInstanceId(), null, getWindowBaseValueClass());
            if (map != null) {
                String str2 = map.get(str);
                if (StringUtil.isNotEmpty(str2)) {
                    loadWindowInstanceSplitData.setPartitionNum(Long.valueOf(str2).longValue());
                }
            }
            Long valueOf = Long.valueOf(DateUtil.parse(windowInstance.getFireTime(), SESSION_DATETIME_PATTERN).getTime());
            Long valueOf2 = Long.valueOf(valueOf.longValue() + 60000);
            ArrayList arrayList = new ArrayList();
            while (true) {
                if (!loadWindowInstanceSplitData.hasNext()) {
                    break;
                }
                WindowValue windowValue = (WindowValue) loadWindowInstanceSplitData.next();
                if (windowValue != null) {
                    if (!checkFire(str, windowValue)) {
                        Long valueOf3 = Long.valueOf(DateUtil.parse(windowValue.getFireTime(), SESSION_DATETIME_PATTERN).getTime());
                        if (valueOf3.longValue() > valueOf.longValue() && valueOf3.longValue() < valueOf2.longValue()) {
                            valueOf2 = valueOf3;
                            break;
                        }
                    } else {
                        TraceUtil.debug(String.valueOf(windowValue.getPartitionNum()), new String[]{"shuffle message fire", windowValue.getStartTime(), windowValue.getEndTime(), windowValue.getComputedColumnResult()});
                        arrayList.add(windowValue);
                    }
                }
            }
            doFire(str, windowInstance, arrayList, valueOf, valueOf2);
            size = arrayList.size();
        }
        return size;
    }

    private boolean checkFire(String str, WindowValue windowValue) {
        Long maxEventTime = getMaxEventTime(str);
        if (maxEventTime == null) {
            maxEventTime = Long.valueOf(System.currentTimeMillis());
        }
        return Long.valueOf(DateUtil.parse(windowValue.getFireTime(), SESSION_DATETIME_PATTERN).getTime()).longValue() < maxEventTime.longValue();
    }

    private void doFire(String str, WindowInstance windowInstance, List<WindowValue> list, Long l, Long l2) {
        if (CollectionUtil.isEmpty(list)) {
            return;
        }
        list.sort(Comparator.comparingLong((v0) -> {
            return v0.getPartitionNum();
        }));
        sendFireMessage(list, str);
        clearWindowValues(list, str, windowInstance);
        if (l2.equals(l)) {
            return;
        }
        WindowInstance searchWindowInstance = searchWindowInstance(windowInstance.createWindowInstanceId());
        if (searchWindowInstance == null) {
            LOG.error("window instance lost, queueId: " + str + " ,fire time" + windowInstance.getFireTime());
        } else {
            searchWindowInstance.setFireTime(DateUtil.format(new Date(l2.longValue())));
            this.windowFireSource.registFireWindowInstanceIfNotExist(windowInstance, this);
        }
    }

    protected void clearWindowValues(List<WindowValue> list, String str, WindowInstance windowInstance) {
        if (CollectionUtil.isEmpty(list)) {
            return;
        }
        HashSet hashSet = new HashSet(list.size());
        HashSet hashSet2 = new HashSet(list.size());
        HashSet hashSet3 = new HashSet(list.size());
        for (WindowValue windowValue : list) {
            String createStoreKey = createStoreKey(str, windowValue.getGroupBy(), windowInstance);
            String createPrefixKey = createPrefixKey(windowValue, windowInstance, str);
            Long valueOf = Long.valueOf(windowValue.getPartitionNum());
            hashSet.add(createStoreKey);
            hashSet2.add(valueOf);
            hashSet3.add(createPrefixKey);
        }
        Map multiGetList = this.storage.multiGetList(WindowValue.class, new ArrayList(hashSet));
        HashMap hashMap = new HashMap(multiGetList.size());
        for (Map.Entry entry : multiGetList.entrySet()) {
            hashMap.put((String) entry.getKey(), (List) ((List) entry.getValue()).stream().filter(windowValue2 -> {
                return !hashSet2.contains(Long.valueOf(windowValue2.getPartitionNum()));
            }).collect(Collectors.toList()));
        }
        this.storage.getLocalStorage().removeKeys(hashSet3);
        store(hashMap, windowInstance, str);
    }

    @Override // org.apache.rocketmq.streams.window.operator.AbstractWindow
    public long incrementAndGetSplitNumber(WindowInstance windowInstance, String str) {
        long incrementAndGetSplitNumber = super.incrementAndGetSplitNumber(windowInstance, str);
        if (incrementAndGetSplitNumber > 900000000) {
            getWindowMaxValueManager().resetSplitNum(windowInstance, str);
        }
        return incrementAndGetSplitNumber;
    }

    static {
        $assertionsDisabled = !SessionOperator.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(SessionOperator.class);
    }
}
