package org.apache.rocketmq.streams.core.rstream;

import java.util.function.Supplier;
import org.apache.rocketmq.streams.core.function.FilterAction;
import org.apache.rocketmq.streams.core.function.ForeachAction;
import org.apache.rocketmq.streams.core.function.SelectAction;
import org.apache.rocketmq.streams.core.function.ValueMapperAction;
import org.apache.rocketmq.streams.core.function.supplier.FilterSupplier;
import org.apache.rocketmq.streams.core.function.supplier.ForeachSupplier;
import org.apache.rocketmq.streams.core.function.supplier.KeySelectSupplier;
import org.apache.rocketmq.streams.core.function.supplier.MultiValueChangeSupplier;
import org.apache.rocketmq.streams.core.function.supplier.PrintSupplier;
import org.apache.rocketmq.streams.core.function.supplier.SinkSupplier;
import org.apache.rocketmq.streams.core.function.supplier.TimestampSelectorSupplier;
import org.apache.rocketmq.streams.core.function.supplier.ValueChangeSupplier;
import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;
import org.apache.rocketmq.streams.core.topology.virtual.GraphNode;
import org.apache.rocketmq.streams.core.topology.virtual.ProcessorNode;
import org.apache.rocketmq.streams.core.topology.virtual.SinkGraphNode;
import org.apache.rocketmq.streams.core.util.OperatorNameMaker;
import org.apache.rocketmq.streams.core.window.JoinType;

/* loaded from: input_file:org/apache/rocketmq/streams/core/rstream/RStreamImpl.class */
public class RStreamImpl<T> implements RStream<T> {
    private final Pipeline pipeline;
    private final GraphNode parent;

    public RStreamImpl(Pipeline pipeline, GraphNode graphNode) {
        this.pipeline = pipeline;
        this.parent = graphNode;
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public RStream<T> selectTimestamp(ValueMapperAction<T, Long> valueMapperAction) {
        return this.pipeline.addRStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.MAP_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new TimestampSelectorSupplier(valueMapperAction)), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public <O> RStream<O> map(ValueMapperAction<T, O> valueMapperAction) {
        return this.pipeline.addRStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.MAP_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new ValueChangeSupplier(valueMapperAction)), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public <VR> RStream<VR> flatMap(ValueMapperAction<T, ? extends Iterable<? extends VR>> valueMapperAction) {
        return this.pipeline.addRStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.FLAT_MAP_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new MultiValueChangeSupplier(valueMapperAction)), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public RStream<T> filter(FilterAction<T> filterAction) {
        return this.pipeline.addRStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.FILTER_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new FilterSupplier(filterAction)), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public <K> GroupedStream<K, T> keyBy(SelectAction<K, T> selectAction) {
        return this.pipeline.addGroupedStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.GROUPBY_PREFIX, this.pipeline.getJobId()), this.parent.getName(), true, (Supplier) new KeySelectSupplier(selectAction)), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public void print() {
        this.pipeline.addVirtualSink(new SinkGraphNode(OperatorNameMaker.makeName(OperatorNameMaker.PRINT_PREFIX, this.pipeline.getJobId()), this.parent.getName(), null, new PrintSupplier()), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public RStream<T> foreach(ForeachAction<T> foreachAction) {
        return this.pipeline.addRStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.FOR_EACH_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new ForeachSupplier(foreachAction)), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public <T2> JoinedStream<T, T2> join(RStream<T2> rStream) {
        return new JoinedStream<>(this, rStream, JoinType.INNER_JOIN);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public <T2> JoinedStream<T, T2> leftJoin(RStream<T2> rStream) {
        return new JoinedStream<>(this, rStream, JoinType.LEFT_JOIN);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public Pipeline getPipeline() {
        return this.pipeline;
    }

    @Override // org.apache.rocketmq.streams.core.rstream.RStream
    public void sink(String str, KeyValueSerializer<Object, T> keyValueSerializer) {
        this.pipeline.addVirtualSink(new SinkGraphNode(OperatorNameMaker.makeName(OperatorNameMaker.SINK_PREFIX, this.pipeline.getJobId()), this.parent.getName(), str, new SinkSupplier(str, keyValueSerializer)), this.parent);
    }
}
