package org.apache.rocketmq.streams.core.window.fire;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.function.BiFunction;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RStreamsException;
import org.apache.rocketmq.streams.core.function.ValueJoinAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.state.StateStore;
import org.apache.rocketmq.streams.core.util.Pair;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.JoinType;
import org.apache.rocketmq.streams.core.window.StreamType;
import org.apache.rocketmq.streams.core.window.WindowKey;
import org.apache.rocketmq.streams.core.window.WindowState;
import org.apache.rocketmq.streams.core.window.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/window/fire/JoinWindowFire.class */
public class JoinWindowFire<K, V1, V2, OUT> {
    private static final Logger logger;
    private final JoinType joinType;
    private final MessageQueue stateTopicMessageQueue;
    private final StreamContext<Object> context;
    private final ValueJoinAction<V1, V2, OUT> joinAction;
    private final WindowStore<K, V1> leftWindowStore;
    private final WindowStore<K, V2> rightWindowStore;
    private final BiFunction<Long, MessageQueue, Long> commitWatermark;
    static final /* synthetic */ boolean $assertionsDisabled;

    public JoinWindowFire(JoinType joinType, MessageQueue messageQueue, StreamContext<Object> streamContext, ValueJoinAction<V1, V2, OUT> valueJoinAction, WindowStore<K, V1> windowStore, WindowStore<K, V2> windowStore2, BiFunction<Long, MessageQueue, Long> biFunction) {
        this.joinType = joinType;
        this.stateTopicMessageQueue = messageQueue;
        this.context = streamContext;
        this.joinAction = valueJoinAction;
        this.leftWindowStore = windowStore;
        this.rightWindowStore = windowStore2;
        this.commitWatermark = biFunction;
    }

    public List<WindowKey> fire(String str, long j, StreamType streamType) {
        ArrayList arrayList = new ArrayList();
        try {
            String buildKey = Utils.buildKey(str, StreamType.LEFT_STREAM.name());
            List<Pair<WindowKey, WindowState<K, V1>>> searchLessThanWatermark = this.leftWindowStore.searchLessThanWatermark(buildKey, j);
            if (searchLessThanWatermark.size() != 0) {
                Iterator<Pair<WindowKey, WindowState<K, V1>>> it = searchLessThanWatermark.iterator();
                while (it.hasNext()) {
                    logger.debug("search with key prefix:{} and watermark:{}, find window: {}", new Object[]{buildKey, Utils.format(j), it.next().getKey()});
                }
            }
            String buildKey2 = Utils.buildKey(str, StreamType.RIGHT_STREAM.name());
            List<Pair<WindowKey, WindowState<K, V2>>> searchLessThanWatermark2 = this.rightWindowStore.searchLessThanWatermark(buildKey2, j);
            if (searchLessThanWatermark2.size() != 0) {
                Iterator<Pair<WindowKey, WindowState<K, V2>>> it2 = searchLessThanWatermark2.iterator();
                while (it2.hasNext()) {
                    logger.debug("search with key prefix:{} and watermark:{}, find window: {}", new Object[]{buildKey2, Utils.format(j), it2.next().getKey()});
                }
            }
            if (searchLessThanWatermark.size() == 0 && searchLessThanWatermark2.size() == 0) {
                logger.debug("left window and right window are all empty, watermark:{}.left window operatorName:{}, right window operatorName:{}", new Object[]{Utils.format(j), buildKey, buildKey2});
                return arrayList;
            }
            searchLessThanWatermark.sort(Comparator.comparing(pair -> {
                return ((WindowKey) pair.getKey()).getWindowEnd();
            }));
            searchLessThanWatermark2.sort(Comparator.comparing(pair2 -> {
                return ((WindowKey) pair2.getKey()).getWindowEnd();
            }));
            switch (this.joinType) {
                case INNER_JOIN:
                    for (Pair<WindowKey, WindowState<K, V1>> pair3 : searchLessThanWatermark) {
                        WindowKey key = pair3.getKey();
                        String keyAndWindow = key.getKeyAndWindow();
                        for (Pair<WindowKey, WindowState<K, V2>> pair4 : searchLessThanWatermark2) {
                            if (keyAndWindow.equals(pair4.getKey().getKeyAndWindow())) {
                                OUT apply = this.joinAction.apply(pair3.getValue().getValue(), pair4.getValue().getValue());
                                Properties header = this.context.getHeader();
                                header.put(Constant.WINDOW_START_TIME, key.getWindowStart());
                                header.put(Constant.WINDOW_END_TIME, key.getWindowEnd());
                                if (!$assertionsDisabled && pair3.getValue().getKey() != pair4.getValue().getKey()) {
                                    throw new AssertionError();
                                }
                                this.context.forward(convert(new Data<>(pair3.getValue().getKey(), apply, Long.valueOf(this.context.getDataTime()), header)));
                                arrayList.add(key);
                            }
                        }
                    }
                    break;
                case LEFT_JOIN:
                    switch (streamType) {
                        case LEFT_STREAM:
                            for (Pair<WindowKey, WindowState<K, V1>> pair5 : searchLessThanWatermark) {
                                WindowKey key2 = pair5.getKey();
                                arrayList.add(key2);
                                String keyAndWindow2 = key2.getKeyAndWindow();
                                Pair<WindowKey, WindowState<K, V2>> pair6 = null;
                                Iterator<Pair<WindowKey, WindowState<K, V2>>> it3 = searchLessThanWatermark2.iterator();
                                while (true) {
                                    if (it3.hasNext()) {
                                        Pair<WindowKey, WindowState<K, V2>> next = it3.next();
                                        if (next.getKey().getKeyAndWindow().equals(keyAndWindow2)) {
                                            pair6 = next;
                                        }
                                    }
                                }
                                V1 value = pair5.getValue().getValue();
                                V2 v2 = null;
                                if (pair6 != null) {
                                    v2 = pair6.getValue().getValue();
                                    arrayList.add(pair6.getKey());
                                    if (!$assertionsDisabled && pair5.getValue().getKey() != pair6.getValue().getKey()) {
                                        throw new AssertionError();
                                    }
                                }
                                OUT apply2 = this.joinAction.apply(value, v2);
                                Properties header2 = this.context.getHeader();
                                header2.put(Constant.WINDOW_START_TIME, key2.getWindowStart());
                                header2.put(Constant.WINDOW_END_TIME, key2.getWindowEnd());
                                this.context.forward(convert(new Data<>(pair5.getValue().getKey(), apply2, Long.valueOf(this.context.getDataTime()), header2)));
                            }
                            break;
                    }
            }
            if (searchLessThanWatermark.size() != 0) {
                logger.debug("delete left window.");
                Iterator<Pair<WindowKey, WindowState<K, V1>>> it4 = searchLessThanWatermark.iterator();
                while (it4.hasNext()) {
                    this.leftWindowStore.deleteByKey(it4.next().getKey());
                }
            }
            if (searchLessThanWatermark2.size() != 0) {
                logger.debug("delete right window.");
                Iterator<Pair<WindowKey, WindowState<K, V2>>> it5 = searchLessThanWatermark2.iterator();
                while (it5.hasNext()) {
                    this.rightWindowStore.deleteByKey(it5.next().getKey());
                }
            }
            return arrayList;
        } catch (Throwable th) {
            throw new RStreamsException(String.format("fire window error, watermark:%s.", Long.valueOf(j)), th);
        }
    }

    private <K> Data<K, Object> convert(Data<?, ?> data) {
        return new Data<>(data.getKey(), data.getValue(), data.getTimestamp(), data.getHeader());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitWatermark(long j) throws Throwable {
        StateStore stateStore = this.context.getStateStore();
        if (j > Utils.bytes2Long(stateStore.get(Utils.watermarkKeyBytes(this.stateTopicMessageQueue, Constant.WATERMARK_KEY)))) {
            this.commitWatermark.apply(Long.valueOf(j), this.stateTopicMessageQueue);
            HashSet hashSet = new HashSet();
            hashSet.add(this.stateTopicMessageQueue);
            stateStore.persist(hashSet);
        }
    }

    static {
        $assertionsDisabled = !JoinWindowFire.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(JoinWindowFire.class);
    }
}
