package org.apache.rocketmq.streams.core.function.supplier;

import java.util.Iterator;
import java.util.Properties;
import java.util.function.Supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.ValueJoinAction;
import org.apache.rocketmq.streams.core.running.AbstractWindowProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.JoinType;
import org.apache.rocketmq.streams.core.window.StreamType;
import org.apache.rocketmq.streams.core.window.Window;
import org.apache.rocketmq.streams.core.window.WindowInfo;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.streams.core.window.WindowState;
import org.apache.rocketmq.streams.core.window.WindowStore;
import org.apache.rocketmq.streams.core.window.fire.JoinWindowFire;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier.class */
public class JoinWindowAggregateSupplier<K, V1, V2, OUT> implements Supplier<Processor<? super OUT>> {
    private static final Logger logger = LoggerFactory.getLogger(JoinWindowAggregateSupplier.class.getName());
    private String name;
    private WindowInfo windowInfo;
    private final ValueJoinAction<V1, V2, OUT> joinAction;
    private JoinType joinType;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/JoinWindowAggregateSupplier$JoinStreamWindowAggregateProcessor.class */
    public class JoinStreamWindowAggregateProcessor extends AbstractWindowProcessor<Object> {
        private String name;
        private final WindowInfo windowInfo;
        private final JoinType joinType;
        private ValueJoinAction<V1, V2, OUT> joinAction;
        private MessageQueue stateTopicMessageQueue;
        private WindowStore<K, V1> leftWindowStore;
        private WindowStore<K, V2> rightWindowStore;

        public JoinStreamWindowAggregateProcessor(String str, WindowInfo windowInfo, JoinType joinType, ValueJoinAction<V1, V2, OUT> valueJoinAction) {
            this.name = Utils.buildKey(str, JoinStreamWindowAggregateProcessor.class.getSimpleName());
            this.windowInfo = windowInfo;
            this.joinType = joinType;
            this.joinAction = valueJoinAction;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<Object> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.leftWindowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
            this.rightWindowStore = new WindowStore<>(super.waitStateReplay(), WindowState::byte2WindowState, WindowState::windowState2Byte);
            this.idleWindowScaner = streamContext.getDefaultWindowScaner();
            this.stateTopicMessageQueue = new MessageQueue(streamContext.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, streamContext.getSourceBrokerName(), streamContext.getSourceQueueId().intValue());
            this.joinWindowFire = new JoinWindowFire<>(this.joinType, this.stateTopicMessageQueue, streamContext.copy(), this.joinAction, this.leftWindowStore, this.rightWindowStore, (v1, v2) -> {
                return watermark(v1, v2);
            });
        }

        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(Object obj) throws Throwable {
            Object key = this.context.getKey();
            long dataTime = this.context.getDataTime();
            Properties header = this.context.getHeader();
            long watermark = watermark(dataTime - this.allowDelay, this.stateTopicMessageQueue);
            if (dataTime < watermark) {
                JoinWindowAggregateSupplier.logger.warn("discard data:[{}], window has been fired. maxFiredWindowEnd:{}, time of data:{}, watermark:{}", new Object[]{obj, Long.valueOf(watermark), Long.valueOf(watermark), Long.valueOf(dataTime)});
                return;
            }
            StreamType streamType = ((WindowInfo.JoinStream) header.get(Constant.STREAM_TAG)).getStreamType();
            if (streamType == null) {
                throw new IllegalStateException(String.format("StreamType is empty, data:%s", obj));
            }
            store(key, obj, dataTime, streamType);
            Iterator<WindowKey> it = this.joinWindowFire.fire(this.name, watermark, streamType).iterator();
            while (it.hasNext()) {
                this.idleWindowScaner.removeWindowKey(it.next());
            }
        }

        private void store(Object obj, Object obj2, long j, StreamType streamType) throws Throwable {
            String buildKey = Utils.buildKey(this.name, streamType.name());
            for (Window window : super.calculateWindow(this.windowInfo, j)) {
                JoinWindowAggregateSupplier.logger.debug("timestamp=" + j + ". time -> window: " + Utils.format(j) + "->" + window);
                WindowKey windowKey = new WindowKey(buildKey, super.toHexString(obj), Long.valueOf(window.getEndTime()), Long.valueOf(window.getStartTime()));
                switch (streamType) {
                    case LEFT_STREAM:
                        this.leftWindowStore.put(this.stateTopicMessageQueue, windowKey, new WindowState<>(obj, obj2, j));
                        this.idleWindowScaner.putJoinWindowCallback(windowKey, this.joinWindowFire);
                        break;
                    case RIGHT_STREAM:
                        this.rightWindowStore.put(this.stateTopicMessageQueue, windowKey, new WindowState<>(obj, obj2, j));
                        this.idleWindowScaner.putJoinWindowCallback(windowKey, this.joinWindowFire);
                        break;
                }
            }
        }
    }

    public JoinWindowAggregateSupplier(String str, WindowInfo windowInfo, ValueJoinAction<V1, V2, OUT> valueJoinAction) {
        this.name = str;
        this.windowInfo = windowInfo;
        this.joinType = windowInfo.getJoinStream().getJoinType();
        this.joinAction = valueJoinAction;
    }

    @Override // java.util.function.Supplier
    public Processor<Object> get() {
        return new JoinStreamWindowAggregateProcessor(this.name, this.windowInfo, this.joinType, this.joinAction);
    }
}
