package org.apache.rocketmq.streams.core.window.fire;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.function.BiFunction;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RStreamsException;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.streams.core.window.WindowState;
import org.apache.rocketmq.streams.core.window.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/window/fire/AggregateSessionWindowFire.class */
public class AggregateSessionWindowFire<K, V, OV> extends AbstractWindowFire<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(AggregateWindowFire.class);
    private final WindowStore<K, OV> windowStore;

    public AggregateSessionWindowFire(WindowStore<K, OV> windowStore, MessageQueue messageQueue, StreamContext<V> streamContext, BiFunction<Long, MessageQueue, Long> biFunction) {
        super(streamContext, messageQueue, biFunction);
        this.windowStore = windowStore;
    }

    @Override // org.apache.rocketmq.streams.core.window.fire.WindowFire
    public List<WindowKey> fire(String str, long j) {
        ArrayList arrayList = new ArrayList();
        try {
            for (Pair<WindowKey, WindowState<K, OV>> pair : this.windowStore.searchMatchKeyPrefix(str)) {
                WindowKey key = pair.getKey();
                WindowState<K, OV> value = pair.getValue();
                long longValue = key.getWindowEnd().longValue();
                long longValue2 = value.getRecordEarliestTimestamp() == Long.MAX_VALUE ? key.getWindowStart().longValue() : value.getRecordEarliestTimestamp();
                logger.info("fire session,windowKey={}, search keyPrefix={}, window: [{} - {}]", new Object[]{key, value.getKey().toString(), Utils.format(longValue2), Utils.format(longValue)});
                Properties header = this.context.getHeader();
                header.put(Constant.WINDOW_START_TIME, Long.valueOf(longValue2));
                header.put(Constant.WINDOW_END_TIME, Long.valueOf(longValue));
                this.context.forward(convert(new Data<>(value.getKey(), value.getValue(), Long.valueOf(value.getRecordLastTimestamp()), header)));
                this.windowStore.deleteByKey(key);
                arrayList.add(key);
            }
            return arrayList;
        } catch (Throwable th) {
            throw new RStreamsException(String.format("fire session window error, name:%s", str), th);
        }
    }
}
