package io.kgraph.streaming;

import io.kgraph.Edge;
import io.kgraph.KGraph;
import io.kgraph.utils.KryoSerde;
import java.time.Duration;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;

/* loaded from: input_file:io/kgraph/streaming/SummaryBulkAggregation.class */
public class SummaryBulkAggregation<K, EV, S, T> extends SummaryAggregation<K, EV, S, T> {
    private final long timeMillis;

    /* loaded from: input_file:io/kgraph/streaming/SummaryBulkAggregation$PartialAgg.class */
    private static final class PartialAgg<K, EV, S> implements Aggregator<Edge<K>, EV, S> {
        private final EdgeFoldFunction<K, EV, S> foldFunction;

        public PartialAgg(EdgeFoldFunction<K, EV, S> edgeFoldFunction) {
            this.foldFunction = edgeFoldFunction;
        }

        public S apply(Edge<K> edge, EV ev, S s) {
            return this.foldFunction.foldEdges(s, edge.source(), edge.target(), ev);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2, Object obj3) {
            return apply((Edge) obj, (Edge<K>) obj2, obj3);
        }
    }

    public SummaryBulkAggregation(EdgeFoldFunction<K, EV, S> edgeFoldFunction, Reducer<S> reducer, ValueMapper<S, T> valueMapper, S s, long j, boolean z) {
        super(edgeFoldFunction, reducer, valueMapper, s, z);
        this.timeMillis = j;
    }

    public SummaryBulkAggregation(EdgeFoldFunction<K, EV, S> edgeFoldFunction, Reducer<S> reducer, S s, long j, boolean z) {
        this(edgeFoldFunction, reducer, null, s, j, z);
    }

    @Override // io.kgraph.streaming.SummaryAggregation
    public KTable<Windowed<Short>, T> run(KStream<Edge<K>, EV> kStream) {
        KTable<Windowed<Short>, T> mapValues = kStream.groupByKey(Grouped.with(new KryoSerde(), new KryoSerde())).windowedBy(TimeWindows.of(Duration.ofMillis(this.timeMillis))).aggregate(this::initialValue, new PartialAgg(updateFun())).toStream().groupBy((windowed, obj) -> {
            return (short) 0;
        }).windowedBy(TimeWindows.of(Duration.ofMillis(this.timeMillis))).reduce(combineFun()).mapValues(aggregator(kStream), Materialized.as(KGraph.generateStoreName()).withKeySerde(new KryoSerde()).withValueSerde(new KryoSerde()));
        return transform() != null ? mapValues.mapValues(transform(), Materialized.as(KGraph.generateStoreName()).withKeySerde(new KryoSerde()).withValueSerde(new KryoSerde())) : mapValues;
    }
}
