package org.apache.rocketmq.streams.core.function.supplier;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.SelectAction;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.Window;
import org.apache.rocketmq.streams.core.window.WindowInfo;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.streams.core.window.WindowState;
import org.apache.rocketmq.streams.core.window.WindowStore;
import org.apache.rocketmq.streams.core.window.fire.AccumulatorSessionWindowFire;
import org.apache.rocketmq.streams.core.window.fire.AccumulatorWindowFire;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.class */
public class WindowAccumulatorSupplier<K, V, R, OV> implements Supplier<Processor<V>> {
    private static final Logger logger = LoggerFactory.getLogger(WindowAccumulatorSupplier.class.getName());
    private final String name;
    private WindowInfo windowInfo;
    private SelectAction<R, V> selectAction;
    private Accumulator<R, OV> accumulator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier$SessionWindowAccumulatorProcessor.class */
    public class SessionWindowAccumulatorProcessor extends AbstractWindowProcessor<V> {
        private final String name;
        private final WindowInfo windowInfo;
        private MessageQueue stateTopicMessageQueue;
        private SelectAction<R, V> selectAction;
        private Accumulator<R, OV> accumulator;
        private WindowStore<K, Accumulator<R, OV>> windowStore;

        public SessionWindowAccumulatorProcessor(String str, WindowInfo windowInfo, SelectAction<R, V> selectAction, Accumulator<R, OV> accumulator) {
            this.name = String.join(Constant.SPLIT, str, SessionWindowAccumulatorProcessor.class.getSimpleName());
            this.windowInfo = windowInfo;
            this.selectAction = selectAction;
            this.accumulator = accumulator;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<V> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.windowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
            this.idleWindowScaner = streamContext.getDefaultWindowScaner();
            this.idleWindowScaner.initSessionTimeOut(this.windowInfo.getSessionTimeout().toMilliseconds());
            this.stateTopicMessageQueue = new MessageQueue(streamContext.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, streamContext.getSourceBrokerName(), streamContext.getSourceQueueId().intValue());
            this.accumulatorSessionWindowFire = new AccumulatorSessionWindowFire<>(this.windowStore, streamContext.copy(), this.stateTopicMessageQueue, (v1, v2) -> {
                return watermark(v1, v2);
            });
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(V v) throws Throwable {
            Object key = this.context.getKey();
            long dataTime = this.context.getDataTime();
            long watermark = watermark(dataTime - this.allowDelay, this.stateTopicMessageQueue);
            if (dataTime < watermark) {
                WindowAccumulatorSupplier.logger.warn("discard data:[{}], window has been fired. time of data:{}, watermark:{}", new Object[]{v, Long.valueOf(dataTime), Long.valueOf(watermark)});
                return;
            }
            Pair<Long, Long> fireIfSessionOut = fireIfSessionOut(key, v, dataTime, watermark);
            if (fireIfSessionOut != null) {
                Accumulator<R, OV> m1clone = this.accumulator.m1clone();
                m1clone.addValue(this.selectAction.select(v));
                WindowState<K, Accumulator<R, OV>> windowState = new WindowState<>(key, m1clone, dataTime);
                if (dataTime < windowState.getRecordEarliestTimestamp()) {
                    windowState.setRecordEarliestTimestamp(dataTime);
                }
                WindowKey windowKey = new WindowKey(this.name, super.toHexString(key), fireIfSessionOut.getValue(), fireIfSessionOut.getKey());
                WindowAccumulatorSupplier.logger.info("new session window, with key={}, valueTime={}, sessionBegin=[{}], sessionEnd=[{}]", new Object[]{key, Utils.format(dataTime), Utils.format(fireIfSessionOut.getKey().longValue()), Utils.format(fireIfSessionOut.getValue().longValue())});
                this.windowStore.put(this.stateTopicMessageQueue, windowKey, windowState);
                this.idleWindowScaner.putAccumulatorSessionWindowCallback(windowKey, this.accumulatorSessionWindowFire);
            }
        }

        private Pair<Long, Long> fireIfSessionOut(K k, V v, long j, long j2) throws Throwable {
            List<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> searchMatchKeyPrefix = this.windowStore.searchMatchKeyPrefix(this.name);
            if (searchMatchKeyPrefix.size() == 0) {
                return new Pair<>(Long.valueOf(j), Long.valueOf(j + this.windowInfo.getSessionTimeout().toMilliseconds()));
            }
            WindowAccumulatorSupplier.logger.debug("exist session state num={}", Integer.valueOf(searchMatchKeyPrefix.size()));
            Iterator<Pair<WindowKey, WindowState<K, Accumulator<R, OV>>>> it = searchMatchKeyPrefix.iterator();
            int i = 0;
            long j3 = 0;
            long j4 = Long.MIN_VALUE;
            while (it.hasNext()) {
                Pair<WindowKey, WindowState<K, Accumulator<R, OV>>> next = it.next();
                int i2 = i;
                i++;
                WindowAccumulatorSupplier.logger.debug("exist session state{}=[{}]", Integer.valueOf(i2), next);
                long longValue = next.getKey().getWindowEnd().longValue();
                if (i == searchMatchKeyPrefix.size()) {
                    j3 = longValue;
                }
                if (longValue < j2) {
                    Iterator<WindowKey> it2 = this.accumulatorSessionWindowFire.fire(this.name, j2).iterator();
                    while (it2.hasNext()) {
                        this.idleWindowScaner.removeWindowKey(it2.next());
                    }
                    it.remove();
                    j4 = Long.max(longValue, j4);
                }
            }
            if (j < j4) {
                WindowAccumulatorSupplier.logger.warn("late data, discard. key=[{}], data=[{}], dataTime < maxFireSessionEnd: [{}] < [{}]", new Object[]{k, v, Long.valueOf(j), Long.valueOf(j4)});
                return null;
            }
            boolean z = false;
            WindowKey windowKey = null;
            for (int i3 = 0; i3 < searchMatchKeyPrefix.size(); i3++) {
                Pair<WindowKey, WindowState<K, Accumulator<R, OV>>> pair = searchMatchKeyPrefix.get(i3);
                WindowKey key = pair.getKey();
                WindowState<K, Accumulator<R, OV>> value = pair.getValue();
                if (key.getWindowEnd().longValue() < j) {
                    z = true;
                } else if (key.getWindowStart().longValue() <= j) {
                    WindowAccumulatorSupplier.logger.debug("data belong to exist session window.dataTime=[{}], window:[{} - {}]", new Object[]{Long.valueOf(j), Utils.format(key.getWindowStart().longValue()), Utils.format(key.getWindowEnd().longValue())});
                    Accumulator<R, OV> value2 = value.getValue();
                    value2.addValue(this.selectAction.select(v));
                    value.setValue(value2);
                    value.setRecordLastTimestamp(j);
                    if (j < value.getRecordEarliestTimestamp()) {
                        value.setRecordEarliestTimestamp(j);
                    }
                    if (i3 == searchMatchKeyPrefix.size() - 1) {
                        long milliseconds = j + this.windowInfo.getSessionTimeout().toMilliseconds();
                        if (key.getWindowEnd().longValue() < milliseconds) {
                            WindowAccumulatorSupplier.logger.debug("update exist session window, before:[{} - {}], after:[{} - {}]", new Object[]{Utils.format(key.getWindowStart().longValue()), Utils.format(key.getWindowEnd().longValue()), Utils.format(key.getWindowStart().longValue()), Utils.format(milliseconds)});
                            windowKey = key;
                            key = new WindowKey(key.getOperatorName(), key.getKey2String(), Long.valueOf(milliseconds), key.getWindowStart());
                        }
                    }
                } else {
                    WindowAccumulatorSupplier.logger.warn("discard data: key=[{}], data=[{}], dataTime=[{}], watermark=[{}]", new Object[]{k, v, Long.valueOf(j), Long.valueOf(j2)});
                }
                this.windowStore.put(this.stateTopicMessageQueue, key, value);
                this.idleWindowScaner.putAccumulatorSessionWindowCallback(key, this.accumulatorSessionWindowFire);
                this.idleWindowScaner.removeOldAccumulatorSession(windowKey);
                this.windowStore.deleteByKey(windowKey);
            }
            if (searchMatchKeyPrefix.size() == 0 || z) {
                return new Pair<>(Long.valueOf(j3), Long.valueOf(j + this.windowInfo.getSessionTimeout().toMilliseconds()));
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier$WindowAccumulatorProcessor.class */
    public class WindowAccumulatorProcessor extends AbstractWindowProcessor<V> {
        private final WindowInfo windowInfo;
        private String name;
        private MessageQueue stateTopicMessageQueue;
        private SelectAction<R, V> selectAction;
        private Accumulator<R, OV> accumulator;
        private WindowStore<K, Accumulator<R, OV>> windowStore;
        private final AtomicReference<Throwable> errorReference = new AtomicReference<>(null);

        public WindowAccumulatorProcessor(String str, WindowInfo windowInfo, SelectAction<R, V> selectAction, Accumulator<R, OV> accumulator) {
            this.name = String.join(Constant.SPLIT, str, WindowAccumulatorProcessor.class.getSimpleName());
            this.windowInfo = windowInfo;
            this.selectAction = selectAction;
            this.accumulator = accumulator;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<V> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.windowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
            this.idleWindowScaner = streamContext.getDefaultWindowScaner();
            this.stateTopicMessageQueue = new MessageQueue(streamContext.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, streamContext.getSourceBrokerName(), streamContext.getSourceQueueId().intValue());
            this.accumulatorWindowFire = new AccumulatorWindowFire<>(this.windowStore, streamContext.copy(), this.stateTopicMessageQueue, (v1, v2) -> {
                return watermark(v1, v2);
            });
        }

        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(V v) throws Throwable {
            Throwable th = this.errorReference.get();
            if (th != null) {
                this.errorReference.set(null);
                throw th;
            }
            Object key = this.context.getKey();
            long dataTime = this.context.getDataTime();
            long watermark = watermark(dataTime - this.allowDelay, this.stateTopicMessageQueue);
            if (dataTime < watermark) {
                WindowAccumulatorSupplier.logger.warn("discard data:[{}], window has been fired. time of data:{}, watermark:{}", new Object[]{v, Long.valueOf(dataTime), Long.valueOf(watermark)});
                return;
            }
            for (Window window : super.calculateWindow(this.windowInfo, dataTime)) {
                WindowAccumulatorSupplier.logger.debug("timestamp=" + dataTime + ". time -> window: " + Utils.format(dataTime) + "->" + window);
                WindowKey windowKey = new WindowKey(this.name, super.toHexString(key), Long.valueOf(window.getEndTime()), Long.valueOf(window.getStartTime()));
                WindowState<K, Accumulator<R, OV>> windowState = this.windowStore.get(windowKey);
                Accumulator m1clone = (windowState == null || windowState.getValue() == null) ? this.accumulator.m1clone() : windowState.getValue();
                m1clone.addValue(this.selectAction.select(v));
                this.windowStore.put(this.stateTopicMessageQueue, windowKey, new WindowState<>(key, m1clone, dataTime));
                this.idleWindowScaner.putAccumulatorWindowCallback(windowKey, this.accumulatorWindowFire);
            }
            try {
                Iterator<WindowKey> it = this.accumulatorWindowFire.fire(this.name, watermark).iterator();
                while (it.hasNext()) {
                    this.idleWindowScaner.removeWindowKey(it.next());
                }
            } catch (Throwable th2) {
                this.errorReference.compareAndSet(null, th2);
            }
        }
    }

    public WindowAccumulatorSupplier(String str, WindowInfo windowInfo, SelectAction<R, V> selectAction, Accumulator<R, OV> accumulator) {
        this.name = str;
        this.windowInfo = windowInfo;
        this.selectAction = selectAction;
        this.accumulator = accumulator;
    }

    @Override // java.util.function.Supplier
    public Processor<V> get() {
        WindowInfo.WindowType windowType = this.windowInfo.getWindowType();
        switch (windowType) {
            case SLIDING_WINDOW:
            case TUMBLING_WINDOW:
                return new WindowAccumulatorProcessor(this.name, this.windowInfo, this.selectAction, this.accumulator);
            case SESSION_WINDOW:
                return new SessionWindowAccumulatorProcessor(this.name, this.windowInfo, this.selectAction, this.accumulator);
            default:
                throw new RuntimeException("window type is error, WindowType=" + windowType);
        }
    }
}
