package io.kgraph;

import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.StreamUtils;
import io.kgraph.utils.TestUtils;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.junit.Test;

/* loaded from: input_file:io/kgraph/ReduceOnNeighborMethodsITCase.class */
public class ReduceOnNeighborMethodsITCase extends AbstractIntegrationTest {
    private String expectedResult;

    /* loaded from: input_file:io/kgraph/ReduceOnNeighborMethodsITCase$SumOutNeighbors.class */
    private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, Long> {
        private SumOutNeighbors() {
        }

        public Long iterateNeighbors(Long l, Map<EdgeWithValue<Long, Long>, Long> map) {
            long j = 0;
            Iterator<Long> it = map.values().iterator();
            while (it.hasNext()) {
                j += it.next().longValue();
            }
            return Long.valueOf(j);
        }

        public /* bridge */ /* synthetic */ Object iterateNeighbors(Object obj, Map map) {
            return iterateNeighbors((Long) obj, (Map<EdgeWithValue<Long, Long>, Long>) map);
        }
    }

    @Test
    public void testSumOfOutNeighbors() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable groupReduceOnNeighbors = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, groupReduceOnNeighbors);
        this.expectedResult = "1,5\n2,3\n3,9\n4,5\n5,1\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }

    @Test
    public void testSumOfOutNeighborsNoValue() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable reduceOnNeighbors = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).reduceOnNeighbors((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        }, EdgeDirection.OUT);
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, reduceOnNeighbors);
        this.expectedResult = "1,5\n2,3\n3,9\n4,5\n5,1\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }
}
