package io.kgraph.streaming;

import io.kgraph.Edge;
import io.kgraph.EdgeDirection;
import io.kgraph.EdgeWithValue;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;

/* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream.class */
public class EdgeStream<K, EV> implements KGraphStream<K, Void, EV> {
    private final KStream<Edge<K>, EV> edges;
    private final GraphSerialized<K, Void, EV> serialized;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$ApplyVertexFilterToEdges.class */
    public static final class ApplyVertexFilterToEdges<K, EV> implements Predicate<Edge<K>, EV> {
        private final Predicate<K, Void> vertexFilter;

        public ApplyVertexFilterToEdges(Predicate<K, Void> predicate) {
            this.vertexFilter = predicate;
        }

        public boolean test(Edge<K> edge, EV ev) {
            return this.vertexFilter.test(edge.source(), null) && this.vertexFilter.test(edge.target(), null);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.Predicate
        public /* bridge */ /* synthetic */ boolean test(Object obj, Object obj2) {
            return test((Edge) obj, (Edge<K>) obj2);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$BuildNeighborhoods.class */
    private static final class BuildNeighborhoods<K, EV> implements KeyValueMapper<Edge<K>, EV, KeyValue<Edge<K>, Set<K>>> {
        private final Map<K, Set<K>> neighborhoods;

        private BuildNeighborhoods() {
            this.neighborhoods = new HashMap();
        }

        public KeyValue<Edge<K>, Set<K>> apply(Edge<K> edge, EV ev) {
            Set<K> computeIfAbsent = this.neighborhoods.computeIfAbsent(edge.source(), obj -> {
                return new TreeSet();
            });
            computeIfAbsent.add(edge.target());
            return new KeyValue<>(edge, computeIfAbsent);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
            return apply((Edge) obj, (Edge<K>) obj2);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$DegreeMapFunction.class */
    private static final class DegreeMapFunction<K> implements KeyValueMapper<K, Long, KeyValue<K, Long>> {
        private final Map<K, Long> localDegrees = new HashMap();

        /* renamed from: apply, reason: avoid collision after fix types in other method */
        public KeyValue<K, Long> apply2(K k, Long l) {
            return new KeyValue<>(k, Long.valueOf(this.localDegrees.compute(k, (obj, l2) -> {
                return Long.valueOf(l2 == null ? l.longValue() : l2.longValue() + l.longValue());
            }).longValue()));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public /* bridge */ /* synthetic */ Object apply(Object obj, Long l) {
            return apply2((DegreeMapFunction<K>) obj, l);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$DegreeTypeSeparator.class */
    private static final class DegreeTypeSeparator<K, EV> implements KeyValueMapper<Edge<K>, EV, Iterable<KeyValue<K, Long>>> {
        private final boolean collectIn;
        private final boolean collectOut;

        public DegreeTypeSeparator(boolean z, boolean z2) {
            this.collectIn = z;
            this.collectOut = z2;
        }

        public Iterable<KeyValue<K, Long>> apply(Edge<K> edge, EV ev) {
            ArrayList arrayList = new ArrayList();
            if (this.collectOut) {
                arrayList.add(new KeyValue(edge.source(), 1L));
            }
            if (this.collectIn) {
                arrayList.add(new KeyValue(edge.target(), 1L));
            }
            return arrayList;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
            return apply((Edge) obj, (Edge<K>) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$DistinctEdgeMapper.class */
    public static final class DistinctEdgeMapper<K, EV> implements KeyValueMapper<Edge<K>, EV, Iterable<KeyValue<Edge<K>, EV>>> {
        private final Set<K> neighbors = new HashSet();

        public Iterable<KeyValue<Edge<K>, EV>> apply(Edge<K> edge, EV ev) {
            if (this.neighbors.contains(edge.target())) {
                return Collections.emptyList();
            }
            this.neighbors.add(edge.target());
            return Collections.singletonList(new KeyValue(edge, ev));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
            return apply((Edge) obj, (Edge<K>) obj2);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$FilterDistinctVertices.class */
    private static final class FilterDistinctVertices<K> implements Predicate<K, Void> {
        private final Set<K> keys;

        private FilterDistinctVertices() {
            this.keys = new HashSet();
        }

        /* renamed from: test, reason: avoid collision after fix types in other method */
        public boolean test2(K k, Void r5) {
            if (this.keys.contains(k)) {
                return false;
            }
            this.keys.add(k);
            return true;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.Predicate
        public /* bridge */ /* synthetic */ boolean test(Object obj, Void r6) {
            return test2((FilterDistinctVertices<K>) obj, r6);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$GlobalAggregateMapper.class */
    public static final class GlobalAggregateMapper<VV> implements KeyValueMapper<Short, VV, Iterable<KeyValue<Short, VV>>> {
        VV previousValue = null;

        /* renamed from: apply, reason: avoid collision after fix types in other method */
        public Iterable<KeyValue<Short, VV>> apply2(Short sh, VV vv) {
            if (vv.equals(this.previousValue)) {
                return Collections.emptyList();
            }
            this.previousValue = vv;
            return Collections.singletonList(new KeyValue((short) 0, vv));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public /* bridge */ /* synthetic */ Object apply(Short sh, Object obj) {
            return apply2(sh, (Short) obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$NeighborKeySelector.class */
    public static final class NeighborKeySelector<K, EV> implements KeyValueMapper<Edge<K>, EdgeWithValue<K, EV>, K> {
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public K apply(Edge<K> edge, EdgeWithValue<K, EV> edgeWithValue) {
            return edge.source();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$ReverseEdgeMapper.class */
    public static final class ReverseEdgeMapper<K, EV> implements KeyValueMapper<Edge<K>, EV, KeyValue<Edge<K>, EV>> {
        private ReverseEdgeMapper() {
        }

        public KeyValue<Edge<K>, EV> apply(Edge<K> edge, EV ev) {
            return new KeyValue<>(edge.reverse(), ev);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public /* bridge */ /* synthetic */ Object apply(Object obj, Object obj2) {
            return apply((Edge) obj, (Edge<K>) obj2);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$TotalEdgeCountMapper.class */
    private static final class TotalEdgeCountMapper<K, EV> implements KeyValueMapper<Edge<K>, EV, KeyValue<Short, Long>> {
        private long edgeCount = 0;

        public KeyValue<Short, Long> apply(Edge<K> edge, EV ev) {
            this.edgeCount++;
            return new KeyValue<>((short) 0, Long.valueOf(this.edgeCount));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public /* bridge */ /* synthetic */ KeyValue<Short, Long> apply(Object obj, Object obj2) {
            return apply((Edge) obj, (Edge<K>) obj2);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-graphs-core-1.4.0.jar:io/kgraph/streaming/EdgeStream$VertexCountMapper.class */
    private static final class VertexCountMapper<K> implements KeyValueMapper<K, Long, Iterable<KeyValue<Short, Long>>> {
        private final Set<K> vertices = new HashSet();

        /* renamed from: apply, reason: avoid collision after fix types in other method */
        public Iterable<KeyValue<Short, Long>> apply2(K k, Long l) {
            this.vertices.add(k);
            return Collections.singletonList(new KeyValue((short) 0, Long.valueOf(this.vertices.size())));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.kstream.KeyValueMapper
        public /* bridge */ /* synthetic */ Iterable<KeyValue<Short, Long>> apply(Object obj, Long l) {
            return apply2((VertexCountMapper<K>) obj, l);
        }
    }

    public EdgeStream(KStream<Edge<K>, EV> kStream, GraphSerialized<K, Void, EV> graphSerialized) {
        this.edges = kStream;
        this.serialized = graphSerialized;
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KStream<Edge<K>, EV> edges() {
        return this.edges;
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KStream<K, Void> vertices() {
        return this.edges.flatMap(new KGraph.EmitSrcAndTarget(obj -> {
            return null;
        })).filter(new FilterDistinctVertices());
    }

    @Override // io.kgraph.streaming.KGraphStream
    public <NV> EdgeStream<K, NV> mapEdges(KeyValueMapper<Edge<K>, EV, KeyValue<Edge<K>, NV>> keyValueMapper, Serde<NV> serde) {
        return new EdgeStream<>(this.edges.map(keyValueMapper), GraphSerialized.with(this.serialized.keySerde(), this.serialized.vertexValueSerde(), serde));
    }

    @Override // io.kgraph.streaming.KGraphStream
    public EdgeStream<K, EV> filterEdges(Predicate<Edge<K>, EV> predicate) {
        return new EdgeStream<>(this.edges.filter(predicate), this.serialized);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public EdgeStream<K, EV> filterVertices(Predicate<K, Void> predicate) {
        return new EdgeStream<>(this.edges.filter(new ApplyVertexFilterToEdges(predicate)), this.serialized);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KStream<Short, Long> numberOfEdges() {
        return this.edges.map(new TotalEdgeCountMapper());
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KStream<Short, Long> numberOfVertices() {
        return globalAggregate(new DegreeTypeSeparator(true, true), new VertexCountMapper(), true);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public EdgeStream<K, EV> distinct() {
        return new EdgeStream<>(this.edges.flatMap(new DistinctEdgeMapper()), this.serialized);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public EdgeStream<K, EV> undirected() {
        return new EdgeStream<>(this.edges.flatMap(new KGraph.UndirectEdges()), this.serialized);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public EdgeStream<K, EV> reverse() {
        return new EdgeStream<>(this.edges.map(new ReverseEdgeMapper()), this.serialized);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KStream<K, Long> degrees() {
        return (KStream<K, Long>) aggregate(new DegreeTypeSeparator(true, true), new DegreeMapFunction());
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KStream<K, Long> inDegrees() {
        return (KStream<K, Long>) aggregate(new DegreeTypeSeparator(true, false), new DegreeMapFunction());
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KStream<K, Long> outDegrees() {
        return (KStream<K, Long>) aggregate(new DegreeTypeSeparator(false, true), new DegreeMapFunction());
    }

    @Override // io.kgraph.streaming.KGraphStream
    public EdgeStream<K, EV> union(KGraphStream<K, Void, EV> kGraphStream) {
        return new EdgeStream<>(this.edges.merge(kGraphStream.edges()), this.serialized);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public <VV> KStream<K, VV> aggregate(KeyValueMapper<Edge<K>, EV, Iterable<KeyValue<K, VV>>> keyValueMapper, KeyValueMapper<K, VV, KeyValue<K, VV>> keyValueMapper2) {
        return this.edges.flatMap(keyValueMapper).map(keyValueMapper2);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public <VV> KStream<Short, VV> globalAggregate(KeyValueMapper<Edge<K>, EV, Iterable<KeyValue<K, VV>>> keyValueMapper, KeyValueMapper<K, VV, Iterable<KeyValue<Short, VV>>> keyValueMapper2, boolean z) {
        KStream flatMap = this.edges.flatMap(keyValueMapper).flatMap(keyValueMapper2);
        if (z) {
            flatMap = flatMap.flatMap(new GlobalAggregateMapper());
        }
        return flatMap;
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KStream<Edge<K>, Set<K>> buildNeighborhood(boolean z) {
        KStream<Edge<K>, EV> edges = edges();
        if (!z) {
            edges = undirected().edges();
        }
        return (KStream<Edge<K>, Set<K>>) edges.map(new BuildNeighborhoods());
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KGraphWindowedStream<K, EV> slice(Windows<? extends Window> windows) {
        return slice(windows, EdgeDirection.OUT);
    }

    @Override // io.kgraph.streaming.KGraphStream
    public KGraphWindowedStream<K, EV> slice(Windows<? extends Window> windows, EdgeDirection edgeDirection) throws IllegalArgumentException {
        switch (edgeDirection) {
            case IN:
                return new KGraphWindowedStream<>(reverse().edges().mapValues(EdgeWithValue::new).groupBy(new NeighborKeySelector()).windowedBy(windows));
            case OUT:
                return new KGraphWindowedStream<>(edges().mapValues(EdgeWithValue::new).groupBy(new NeighborKeySelector()).windowedBy(windows));
            case BOTH:
                return new KGraphWindowedStream<>(undirected().edges().mapValues(EdgeWithValue::new).groupBy(new NeighborKeySelector()).windowedBy(windows));
            default:
                throw new IllegalArgumentException("Illegal edge direction");
        }
    }

    @Override // io.kgraph.streaming.KGraphStream
    public <S, T> KTable<Windowed<Short>, T> aggregate(SummaryAggregation<K, EV, S, T> summaryAggregation) {
        return summaryAggregation.run(edges());
    }
}
