package org.apache.rocketmq.streams.core.function.supplier;

import java.util.function.Supplier;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.streams.core.common.Constant;
import org.apache.rocketmq.streams.core.exception.RecoverStateStoreThrowable;
import org.apache.rocketmq.streams.core.function.ValueJoinAction;
import org.apache.rocketmq.streams.core.metadata.Data;
import org.apache.rocketmq.streams.core.running.AbstractProcessor;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.running.StreamContext;
import org.apache.rocketmq.streams.core.state.StateStore;
import org.apache.rocketmq.streams.core.util.Utils;
import org.apache.rocketmq.streams.core.window.JoinType;
import org.apache.rocketmq.streams.core.window.StreamType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier.class */
public class JoinAggregateSupplier<K, V1, V2, OUT> implements Supplier<Processor<? super OUT>> {
    private static final Logger logger = LoggerFactory.getLogger(JoinAggregateSupplier.class.getName());
    private String name;
    private JoinType joinType;
    private final ValueJoinAction<V1, V2, OUT> joinAction;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/rocketmq/streams/core/function/supplier/JoinAggregateSupplier$JoinStreamAggregateProcessor.class */
    public class JoinStreamAggregateProcessor extends AbstractProcessor<Object> {
        private String name;
        private JoinType joinType;
        private final ValueJoinAction<V1, V2, OUT> joinAction;
        private MessageQueue stateTopicMessageQueue;
        private StateStore stateStore;

        public JoinStreamAggregateProcessor(String str, JoinType joinType, ValueJoinAction<V1, V2, OUT> valueJoinAction) {
            this.name = str;
            this.joinType = joinType;
            this.joinAction = valueJoinAction;
        }

        @Override // org.apache.rocketmq.streams.core.running.AbstractProcessor, org.apache.rocketmq.streams.core.running.Processor
        public void preProcess(StreamContext<Object> streamContext) throws RecoverStateStoreThrowable {
            super.preProcess(streamContext);
            this.stateStore = super.waitStateReplay();
            this.stateTopicMessageQueue = new MessageQueue(streamContext.getSourceTopic() + Constant.STATE_TOPIC_SUFFIX, streamContext.getSourceBrokerName(), streamContext.getSourceQueueId().intValue());
        }

        @Override // org.apache.rocketmq.streams.core.running.Processor
        public void process(Object obj) throws Throwable {
            Object key = this.context.getKey();
            StreamType streamType = (StreamType) this.context.getHeader().get(Constant.STREAM_TAG);
            store(key, obj, streamType);
            fire(key, obj, streamType);
        }

        private void store(Object obj, Object obj2, StreamType streamType) throws Throwable {
            String buildKey = Utils.buildKey(this.name, streamType.name());
            switch (streamType) {
                case LEFT_STREAM:
                case RIGHT_STREAM:
                    this.stateStore.put(this.stateTopicMessageQueue, Utils.object2Byte(Utils.buildKey(buildKey, super.toHexString(obj))), super.object2Byte(obj2));
                    return;
                default:
                    return;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void fire(Object obj, Object obj2, StreamType streamType) throws Throwable {
            byte[] bArr;
            switch (streamType) {
                case LEFT_STREAM:
                    byte[] bArr2 = this.stateStore.get(Utils.object2Byte(Utils.buildKey(Utils.buildKey(this.name, StreamType.RIGHT_STREAM.name()), super.toHexString(obj))));
                    if (this.joinType == JoinType.INNER_JOIN) {
                        if (bArr2 == null || bArr2.length == 0) {
                            return;
                        }
                    } else if (this.joinType != JoinType.LEFT_JOIN) {
                        throw new UnsupportedOperationException("unknown joinType = " + this.joinType);
                    }
                    doFire(obj2, super.byte2Object(bArr2));
                    return;
                case RIGHT_STREAM:
                    if (this.joinType != JoinType.INNER_JOIN || (bArr = this.stateStore.get(Utils.object2Byte(Utils.buildKey(Utils.buildKey(this.name, StreamType.LEFT_STREAM.name()), super.toHexString(obj))))) == null || bArr.length == 0) {
                        return;
                    }
                    doFire(super.byte2Object(bArr), obj2);
                    return;
                default:
                    return;
            }
        }

        private void doFire(V1 v1, V2 v2) throws Throwable {
            this.context.forward(super.convert(new Data<>(this.context.getKey(), this.joinAction.apply(v1, v2), Long.valueOf(this.context.getDataTime()), this.context.getHeader())));
        }
    }

    public JoinAggregateSupplier(String str, JoinType joinType, ValueJoinAction<V1, V2, OUT> valueJoinAction) {
        this.name = str;
        this.joinType = joinType;
        this.joinAction = valueJoinAction;
    }

    @Override // java.util.function.Supplier
    public Processor<Object> get() {
        return new JoinStreamAggregateProcessor(this.name, this.joinType, this.joinAction);
    }
}
