package org.apache.rocketmq.streams.core.rstream;

import java.util.function.Supplier;
import org.apache.rocketmq.streams.core.function.FilterAction;
import org.apache.rocketmq.streams.core.function.SelectAction;
import org.apache.rocketmq.streams.core.function.ValueMapperAction;
import org.apache.rocketmq.streams.core.function.accumulator.Accumulator;
import org.apache.rocketmq.streams.core.function.supplier.AccumulatorSupplier;
import org.apache.rocketmq.streams.core.function.supplier.AddTagSupplier;
import org.apache.rocketmq.streams.core.function.supplier.AggregateSupplier;
import org.apache.rocketmq.streams.core.function.supplier.FilterSupplier;
import org.apache.rocketmq.streams.core.function.supplier.SinkSupplier;
import org.apache.rocketmq.streams.core.function.supplier.SumAggregate;
import org.apache.rocketmq.streams.core.function.supplier.ValueChangeSupplier;
import org.apache.rocketmq.streams.core.running.Processor;
import org.apache.rocketmq.streams.core.serialization.KeyValueSerializer;
import org.apache.rocketmq.streams.core.topology.virtual.GraphNode;
import org.apache.rocketmq.streams.core.topology.virtual.ProcessorNode;
import org.apache.rocketmq.streams.core.topology.virtual.ShuffleProcessorNode;
import org.apache.rocketmq.streams.core.topology.virtual.SinkGraphNode;
import org.apache.rocketmq.streams.core.util.OperatorNameMaker;
import org.apache.rocketmq.streams.core.window.WindowInfo;

/* loaded from: input_file:org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.class */
public class GroupedStreamImpl<K, V> implements GroupedStream<K, V> {
    private final Pipeline pipeline;
    private final GraphNode parent;

    public GroupedStreamImpl(Pipeline pipeline, GraphNode graphNode) {
        this.pipeline = pipeline;
        this.parent = graphNode;
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public GroupedStream<K, Integer> count() {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.COUNT_PREFIX, this.pipeline.getJobId());
        AggregateSupplier aggregateSupplier = new AggregateSupplier(makeName, this.parent.getName(), () -> {
            return 0;
        }, (obj, obj2, num) -> {
            return Integer.valueOf(num.intValue() + 1);
        });
        return this.pipeline.addGroupedStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), aggregateSupplier) : new ProcessorNode(makeName, this.parent.getName(), aggregateSupplier), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public <OUT> GroupedStream<K, Integer> count(SelectAction<OUT, V> selectAction) {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.COUNT_PREFIX, this.pipeline.getJobId());
        AggregateSupplier aggregateSupplier = new AggregateSupplier(makeName, this.parent.getName(), () -> {
            return 0;
        }, (obj, obj2, num) -> {
            return Integer.valueOf(num.intValue() + 1);
        });
        return this.pipeline.addGroupedStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), aggregateSupplier) : new ProcessorNode(makeName, this.parent.getName(), aggregateSupplier), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public GroupedStream<K, V> min(SelectAction<? extends Number, V> selectAction) {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.MIN_PREFIX, this.pipeline.getJobId());
        AggregateSupplier aggregateSupplier = new AggregateSupplier(makeName, this.parent.getName(), () -> {
            return null;
        }, (obj, obj2, obj3) -> {
            Number number = (Number) selectAction.select(obj2);
            if (obj3 == null) {
                return obj2;
            }
            return number.doubleValue() < ((Number) selectAction.select(obj3)).doubleValue() ? obj2 : obj3;
        });
        return this.pipeline.addGroupedStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), aggregateSupplier) : new ProcessorNode(makeName, this.parent.getName(), aggregateSupplier), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public GroupedStream<K, V> max(SelectAction<? extends Number, V> selectAction) {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.MAX_PREFIX, this.pipeline.getJobId());
        AggregateSupplier aggregateSupplier = new AggregateSupplier(makeName, this.parent.getName(), () -> {
            return null;
        }, (obj, obj2, obj3) -> {
            Number number = (Number) selectAction.select(obj2);
            if (obj3 == null) {
                return obj2;
            }
            return number.doubleValue() > ((Number) selectAction.select(obj3)).doubleValue() ? obj2 : obj3;
        });
        return this.pipeline.addGroupedStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), aggregateSupplier) : new ProcessorNode(makeName, this.parent.getName(), aggregateSupplier), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public GroupedStream<K, ? extends Number> sum(SelectAction<? extends Number, V> selectAction) {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.SUM_PREFIX, this.pipeline.getJobId());
        AggregateSupplier aggregateSupplier = new AggregateSupplier(makeName, this.parent.getName(), () -> {
            return null;
        }, new SumAggregate(selectAction));
        return this.pipeline.addGroupedStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), aggregateSupplier) : new ProcessorNode(makeName, this.parent.getName(), aggregateSupplier), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public GroupedStream<K, V> filter(FilterAction<V> filterAction) {
        return this.pipeline.addGroupedStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.FILTER_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new FilterSupplier(filterAction)), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public <OUT> GroupedStream<K, OUT> map(ValueMapperAction<V, OUT> valueMapperAction) {
        return this.pipeline.addGroupedStreamVirtualNode(new ProcessorNode(OperatorNameMaker.makeName(OperatorNameMaker.MAP_PREFIX, this.pipeline.getJobId()), this.parent.getName(), new ValueChangeSupplier(valueMapperAction)), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public <OUT> GroupedStream<K, OUT> aggregate(Accumulator<V, OUT> accumulator) {
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.ACCUMULATE_PREFIX, this.pipeline.getJobId());
        AccumulatorSupplier accumulatorSupplier = new AccumulatorSupplier(makeName, this.parent.getName(), obj -> {
            return obj;
        }, accumulator);
        return this.pipeline.addGroupedStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(makeName, this.parent.getName(), accumulatorSupplier) : new ProcessorNode(makeName, this.parent.getName(), accumulatorSupplier), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public WindowStream<K, V> window(WindowInfo windowInfo) {
        GraphNode shuffleProcessorNode;
        String makeName = OperatorNameMaker.makeName(OperatorNameMaker.WINDOW_PREFIX, this.pipeline.getJobId());
        if (!this.parent.shuffleNode()) {
            shuffleProcessorNode = new ProcessorNode(makeName, this.parent.getName(), new AddTagSupplier());
        } else if (windowInfo.getJoinStream() != null) {
            String name = this.parent.getName();
            windowInfo.getClass();
            shuffleProcessorNode = new ShuffleProcessorNode(makeName, name, new AddTagSupplier(windowInfo::getJoinStream));
        } else {
            shuffleProcessorNode = new ShuffleProcessorNode(makeName, this.parent.getName(), new AddTagSupplier());
        }
        return this.pipeline.addWindowStreamVirtualNode(shuffleProcessorNode, this.parent, windowInfo);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public GroupedStream<K, V> addGraphNode(String str, Supplier<Processor<V>> supplier) {
        return this.pipeline.addGroupedStreamVirtualNode(this.parent.shuffleNode() ? new ShuffleProcessorNode(str, this.parent.getName(), supplier) : new ProcessorNode(str, this.parent.getName(), supplier), this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public RStream<V> toRStream() {
        return new RStreamImpl(this.pipeline, this.parent);
    }

    @Override // org.apache.rocketmq.streams.core.rstream.GroupedStream
    public void sink(String str, KeyValueSerializer<K, V> keyValueSerializer) {
        this.pipeline.addVirtualSink(new SinkGraphNode(OperatorNameMaker.makeName(OperatorNameMaker.SINK_PREFIX, this.pipeline.getJobId()), this.parent.getName(), str, new SinkSupplier(str, keyValueSerializer)), this.parent);
    }
}
