package io.kgraph;

import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.StreamUtils;
import io.kgraph.utils.TestUtils;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.junit.Test;

/* loaded from: input_file:io/kgraph/GraphOperationsITCase.class */
public class GraphOperationsITCase extends AbstractIntegrationTest {
    private String expectedResult;

    @Test
    public void testOutDegrees() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable outDegrees = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).outDegrees();
        this.expectedResult = "1,2\n2,1\n3,2\n4,1\n5,1\n";
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        TestUtils.compareResultAsTuples(StreamUtils.listFromTable(this.streams, outDegrees), this.expectedResult);
    }

    @Test
    public void testInDegrees() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable inDegrees = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).inDegrees();
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, inDegrees);
        this.expectedResult = "1,1\n2,1\n3,2\n4,1\n5,2\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }

    @Test
    public void testUndirected() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable edges = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).undirected().edges();
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, edges);
        this.expectedResult = "1,2,12\n2,1,12\n1,3,13\n3,1,13\n2,3,23\n3,2,23\n3,4,34\n4,3,34\n3,5,35\n5,3,35\n4,5,45\n5,4,45\n5,1,51\n1,5,51\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }

    @Test
    public void testSubGraph() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable edges = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).subgraph((l, l2) -> {
            return l2.longValue() > 2;
        }, (edge, l3) -> {
            return l3.longValue() > 34;
        }).edges();
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, edges);
        this.expectedResult = "3,5,35\n4,5,45\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }

    @Test
    public void testFilterVertices() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable edges = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).filterOnVertices((l, l2) -> {
            return l2.longValue() > 2;
        }).edges();
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, edges);
        this.expectedResult = "3,4,34\n3,5,35\n4,5,45\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }

    @Test
    public void testFilterEdges() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable edges = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).filterOnEdges((edge, l) -> {
            return l.longValue() > 34;
        }).edges();
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, edges);
        this.expectedResult = "3,5,35\n4,5,45\n5,1,51\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }
}
