package io.kgraph.streaming.library;

import com.google.common.collect.Lists;
import io.kgraph.AbstractIntegrationTest;
import io.kgraph.Edge;
import io.kgraph.GraphSerialized;
import io.kgraph.streaming.EdgeStream;
import io.kgraph.streaming.summaries.Candidates;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.StreamUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/kgraph/streaming/library/BipartitenessCheckTest.class */
public class BipartitenessCheckTest extends AbstractIntegrationTest {
    @Test
    public void testBipartite() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = new EdgeStream(StreamUtils.streamFromCollection(streamsBuilder, producerConfig, new KryoSerde(), new KryoSerde(), getBipartiteEdges()), GraphSerialized.with(new KryoSerde(), new KryoSerde(), new KryoSerde())).aggregate(new BipartitenessCheck(500L));
        startStreams(streamsBuilder, new KryoSerde(), new KryoSerde());
        Thread.sleep(10000L);
        Assert.assertEquals(Lists.newArrayList(new String[]{"(true,{1={1=(1,true), 2=(2,false), 3=(3,false), 4=(4,false), 5=(5,true), 7=(7,true), 9=(9,true)}})"}), (List) StreamUtils.listFromTable(this.streams, aggregate).stream().map(keyValue -> {
            return ((Candidates) keyValue.value).toString();
        }).collect(Collectors.toList()));
        this.streams.close();
    }

    @Test
    public void testNonBipartite() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = new EdgeStream(StreamUtils.streamFromCollection(streamsBuilder, producerConfig, new KryoSerde(), new KryoSerde(), getNonBipartiteEdges()), GraphSerialized.with(new KryoSerde(), new KryoSerde(), new KryoSerde())).aggregate(new BipartitenessCheck(500L));
        startStreams(streamsBuilder, new KryoSerde(), new KryoSerde());
        Thread.sleep(10000L);
        Assert.assertEquals(Lists.newArrayList(new String[]{"(false,{})"}), (List) StreamUtils.listFromTable(this.streams, aggregate).stream().map(keyValue -> {
            return ((Candidates) keyValue.value).toString();
        }).collect(Collectors.toList()));
        this.streams.close();
    }

    static List<KeyValue<Edge<Long>, Void>> getBipartiteEdges() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Edge(1L, 2L), (Object) null));
        arrayList.add(new KeyValue(new Edge(1L, 3L), (Object) null));
        arrayList.add(new KeyValue(new Edge(1L, 4L), (Object) null));
        arrayList.add(new KeyValue(new Edge(4L, 5L), (Object) null));
        arrayList.add(new KeyValue(new Edge(4L, 7L), (Object) null));
        arrayList.add(new KeyValue(new Edge(4L, 9L), (Object) null));
        return arrayList;
    }

    static List<KeyValue<Edge<Long>, Void>> getNonBipartiteEdges() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Edge(1L, 2L), (Object) null));
        arrayList.add(new KeyValue(new Edge(2L, 3L), (Object) null));
        arrayList.add(new KeyValue(new Edge(3L, 1L), (Object) null));
        arrayList.add(new KeyValue(new Edge(4L, 5L), (Object) null));
        arrayList.add(new KeyValue(new Edge(5L, 7L), (Object) null));
        arrayList.add(new KeyValue(new Edge(4L, 1L), (Object) null));
        return arrayList;
    }
}
