package org.apache.rocketmq.streams.core.window.fire;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.window.StreamType;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner.class */
public class IdleWindowScaner implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(IdleWindowScaner.class.getName());
    private final Integer maxIdleTime;
    private final ScheduledExecutorService executor;
    private long sessionTimeOut = 0;
    private final ConcurrentHashMap<WindowKey, TimeType> lastUpdateTime2WindowKey = new ConcurrentHashMap<>(16);
    private final ConcurrentHashMap<WindowKey, AccumulatorWindowFire<?, ?, ?, ?>> fireWindowCallBack = new ConcurrentHashMap<>(16);
    private final ConcurrentHashMap<WindowKey, AccumulatorSessionWindowFire<?, ?, ?, ?>> fireSessionWindowCallback = new ConcurrentHashMap<>(16);
    private final ConcurrentHashMap<WindowKey, AggregateWindowFire<?, ?, ?>> windowKeyAggregate = new ConcurrentHashMap<>(16);
    private final ConcurrentHashMap<WindowKey, AggregateSessionWindowFire<?, ?, ?>> windowKeyAggregateSession = new ConcurrentHashMap<>(16);
    private final ConcurrentHashMap<WindowKey, JoinWindowFire<?, ?, ?, ?>> fireJoinWindowCallback = new ConcurrentHashMap<>(16);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner$TimeType.class */
    public static class TimeType {
        private Type type;
        private long updateTime;

        public TimeType(Type type, long j) {
            this.type = type;
            this.updateTime = j;
        }

        public Type getType() {
            return this.type;
        }

        public void setType(Type type) {
            this.type = type;
        }

        public long getUpdateTime() {
            return this.updateTime;
        }

        public void setUpdateTime(long j) {
            this.updateTime = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/window/fire/IdleWindowScaner$Type.class */
    public enum Type {
        AccumulatorWindow,
        AccumulatorSessionWindow,
        AggregateWindow,
        AggregateSessionWindow,
        JoinWindow
    }

    public IdleWindowScaner(Integer num, ScheduledExecutorService scheduledExecutorService) {
        this.maxIdleTime = num;
        this.executor = scheduledExecutorService;
        this.executor.scheduleAtFixedRate(() -> {
            try {
                scanAndFireWindow();
            } catch (Throwable th) {
                logger.error("scan and fire the idle window error.", th);
            }
        }, 0L, 1000L, TimeUnit.MILLISECONDS);
    }

    public void initSessionTimeOut(long j) {
        this.sessionTimeOut = j;
    }

    public void putAccumulatorWindowCallback(WindowKey windowKey, AccumulatorWindowFire<?, ?, ?, ?> accumulatorWindowFire) {
        this.fireWindowCallBack.putIfAbsent(windowKey, accumulatorWindowFire);
        this.lastUpdateTime2WindowKey.compute(windowKey, (windowKey2, timeType) -> {
            if (timeType == null) {
                timeType = new TimeType(Type.AccumulatorWindow, System.currentTimeMillis());
            } else {
                timeType.setUpdateTime(System.currentTimeMillis());
            }
            return timeType;
        });
    }

    public void putAccumulatorSessionWindowCallback(WindowKey windowKey, AccumulatorSessionWindowFire<?, ?, ?, ?> accumulatorSessionWindowFire) {
        this.fireSessionWindowCallback.putIfAbsent(windowKey, accumulatorSessionWindowFire);
        this.lastUpdateTime2WindowKey.compute(windowKey, (windowKey2, timeType) -> {
            if (timeType == null) {
                timeType = new TimeType(Type.AccumulatorSessionWindow, System.currentTimeMillis());
            } else {
                timeType.setUpdateTime(System.currentTimeMillis());
            }
            return timeType;
        });
    }

    public void putAggregateWindowCallback(WindowKey windowKey, AggregateWindowFire<?, ?, ?> aggregateWindowFire) {
        this.windowKeyAggregate.putIfAbsent(windowKey, aggregateWindowFire);
        this.lastUpdateTime2WindowKey.compute(windowKey, (windowKey2, timeType) -> {
            if (timeType == null) {
                timeType = new TimeType(Type.AggregateWindow, System.currentTimeMillis());
            } else {
                timeType.setUpdateTime(System.currentTimeMillis());
            }
            return timeType;
        });
    }

    public void putAggregateSessionWindowCallback(WindowKey windowKey, AggregateSessionWindowFire<?, ?, ?> aggregateSessionWindowFire) {
        this.windowKeyAggregateSession.putIfAbsent(windowKey, aggregateSessionWindowFire);
        this.lastUpdateTime2WindowKey.compute(windowKey, (windowKey2, timeType) -> {
            if (timeType == null) {
                timeType = new TimeType(Type.AggregateSessionWindow, System.currentTimeMillis());
            } else {
                timeType.setUpdateTime(System.currentTimeMillis());
            }
            return timeType;
        });
    }

    public void putJoinWindowCallback(WindowKey windowKey, JoinWindowFire<?, ?, ?, ?> joinWindowFire) {
        this.fireJoinWindowCallback.putIfAbsent(windowKey, joinWindowFire);
        this.lastUpdateTime2WindowKey.compute(windowKey, (windowKey2, timeType) -> {
            if (timeType == null) {
                timeType = new TimeType(Type.JoinWindow, System.currentTimeMillis());
            } else {
                timeType.setUpdateTime(System.currentTimeMillis());
            }
            return timeType;
        });
    }

    public void removeOldAccumulatorSession(WindowKey windowKey) {
        TimeType timeType = this.lastUpdateTime2WindowKey.get(windowKey);
        if (timeType != null && timeType.getType() == Type.AccumulatorSessionWindow) {
            this.lastUpdateTime2WindowKey.remove(windowKey);
        }
        this.fireSessionWindowCallback.remove(windowKey);
    }

    public void removeOldAggregateSession(WindowKey windowKey) {
        TimeType timeType = this.lastUpdateTime2WindowKey.get(windowKey);
        if (timeType != null && timeType.getType() == Type.AggregateSessionWindow) {
            this.lastUpdateTime2WindowKey.remove(windowKey);
        }
        this.windowKeyAggregateSession.remove(windowKey);
    }

    public void removeWindowKey(WindowKey windowKey) {
        this.lastUpdateTime2WindowKey.remove(windowKey);
        this.fireWindowCallBack.remove(windowKey);
        this.fireSessionWindowCallback.remove(windowKey);
        this.windowKeyAggregate.remove(windowKey);
        this.windowKeyAggregateSession.remove(windowKey);
        this.fireJoinWindowCallback.remove(windowKey);
    }

    private void scanAndFireWindow() throws Throwable {
        Iterator<Map.Entry<WindowKey, TimeType>> it = this.lastUpdateTime2WindowKey.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<WindowKey, TimeType> next = it.next();
            WindowKey key = next.getKey();
            TimeType value = next.getValue();
            Type type = value.getType();
            long currentTimeMillis = System.currentTimeMillis() - value.getUpdateTime();
            switch (type) {
                case AggregateSessionWindow:
                case AccumulatorSessionWindow:
                    if (currentTimeMillis >= this.sessionTimeOut) {
                        try {
                            doFire(key, type);
                            it.remove();
                            break;
                        } finally {
                        }
                    } else {
                        continue;
                    }
                case AccumulatorWindow:
                case JoinWindow:
                case AggregateWindow:
                    long longValue = key.getWindowEnd().longValue() - key.getWindowStart().longValue();
                    if (currentTimeMillis > this.maxIdleTime.intValue() && currentTimeMillis > longValue) {
                        try {
                            doFire(key, type);
                            it.remove();
                            break;
                        } finally {
                        }
                    }
                    break;
                default:
                    throw new UnsupportedOperationException("unknown window type: " + type);
            }
        }
    }

    private void doFire(WindowKey windowKey, Type type) throws Throwable {
        long longValue = windowKey.getWindowEnd().longValue() + 1;
        String operatorName = windowKey.getOperatorName();
        switch (type) {
            case AggregateSessionWindow:
                AggregateSessionWindowFire<?, ?, ?> remove = this.windowKeyAggregateSession.remove(windowKey);
                if (remove != null) {
                    remove.fire(operatorName, longValue);
                    remove.commitWatermark(longValue);
                    return;
                }
                return;
            case AccumulatorSessionWindow:
                AccumulatorSessionWindowFire<?, ?, ?, ?> remove2 = this.fireSessionWindowCallback.remove(windowKey);
                if (remove2 != null) {
                    remove2.fire(operatorName, longValue);
                    remove2.commitWatermark(longValue);
                    return;
                }
                return;
            case AccumulatorWindow:
                AccumulatorWindowFire<?, ?, ?, ?> remove3 = this.fireWindowCallBack.remove(windowKey);
                if (remove3 != null) {
                    remove3.fire(operatorName, longValue);
                    remove3.commitWatermark(longValue);
                    return;
                }
                return;
            case JoinWindow:
                JoinWindowFire<?, ?, ?, ?> remove4 = this.fireJoinWindowCallback.remove(windowKey);
                if (remove4 != null) {
                    remove4.fire(operatorName.substring(0, operatorName.lastIndexOf(Constant.SPLIT)), longValue, StreamType.valueOf(operatorName.substring(operatorName.lastIndexOf(Constant.SPLIT) + 1)));
                    remove4.commitWatermark(longValue);
                    return;
                }
                return;
            case AggregateWindow:
                AggregateWindowFire<?, ?, ?> remove5 = this.windowKeyAggregate.remove(windowKey);
                if (remove5 != null) {
                    remove5.fire(operatorName, longValue);
                    remove5.commitWatermark(longValue);
                    return;
                }
                return;
            default:
                return;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.executor.shutdown();
    }
}
