package io.kgraph.library.similarity;

import io.kgraph.AbstractIntegrationTest;
import io.kgraph.Edge;
import io.kgraph.GraphAlgorithm;
import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.KryoSerializer;
import io.kgraph.utils.StreamUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kgraph/library/similarity/JaccardTest.class */
public class JaccardTest extends AbstractIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(JaccardTest.class);
    GraphAlgorithm<Long, Double, Double, KTable<Long, Double>> algorithm;

    /* loaded from: input_file:io/kgraph/library/similarity/JaccardTest$InitVertices.class */
    private static final class InitVertices implements ValueMapper<Long, Double> {
        private InitVertices() {
        }

        public Double apply(Long l) {
            return Double.valueOf(0.0d);
        }
    }

    @Test
    public void testExactSimilarity() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Edge(1L, 2L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(1L, 3L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(1L, 4L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 1L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 4L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 5L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(3L, 1L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(3L, 4L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 1L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 2L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 3L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 5L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 2L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 4L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 6L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(6L, 5L), Double.valueOf(0.0d)));
        KGraph fromEdges = KGraph.fromEdges(StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, DoubleSerializer.class, new Properties()), new KryoSerde(), Serdes.Double(), arrayList), new InitVertices(), GraphSerialized.with(Serdes.Long(), Serdes.Double(), Serdes.Double()));
        GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-similarity", "prepare-client-similarity", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-similarity", "edgesGroupedBySource-similarity", 2, (short) 1).get();
        HashMap hashMap = new HashMap();
        hashMap.put("distance.conversion.enabled", false);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-similarity", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-similarity", "edgesGroupedBySource-similarity", fromEdges.serialized(), "solutionSet-similarity", "solutionSetStore-similarity", "workSet-similarity", 2, (short) 1, hashMap, Optional.empty(), new Jaccard());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-similarity", "run-client-similarity", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "edgesStore-run-similarity");
        log.debug("edges : {}", mapFromStore);
        Assert.assertEquals("{1={2=0.2, 3=0.25, 4=0.4}, 2={1=0.2, 4=0.4, 5=0.2}, 3={1=0.25, 4=0.2}, 4={1=0.4, 2=0.4, 3=0.2, 5=0.16666666666666666}, 5={2=0.2, 4=0.16666666666666666, 6=0.0}, 6={5=0.0}}", mapFromStore.toString());
    }

    @Test
    public void testExactDistance() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Edge(1L, 2L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(1L, 3L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(1L, 4L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 1L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 4L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 5L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(3L, 1L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(3L, 4L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 1L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 2L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 3L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 5L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 2L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 4L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 6L), Double.valueOf(0.0d)));
        arrayList.add(new KeyValue(new Edge(6L, 5L), Double.valueOf(0.0d)));
        KGraph fromEdges = KGraph.fromEdges(StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, DoubleSerializer.class, new Properties()), new KryoSerde(), Serdes.Double(), arrayList), new InitVertices(), GraphSerialized.with(Serdes.Long(), Serdes.Double(), Serdes.Double()));
        GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-distance", "prepare-client-distance", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-distance", "edgesGroupedBySource-distance", 2, (short) 1).get();
        HashMap hashMap = new HashMap();
        hashMap.put("distance.conversion.enabled", true);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-distance", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-distance", "edgesGroupedBySource-distance", fromEdges.serialized(), "solutionSet-distance", "solutionSetStore-distance", "workSet-distance", 2, (short) 1, hashMap, Optional.empty(), new Jaccard());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-distance", "run-client-distance", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Thread.sleep(2000L);
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "edgesStore-run-distance");
        log.debug("edges : {}", mapFromStore);
        Assert.assertEquals("{1={2=4.0, 3=3.0, 4=1.5}, 2={1=4.0, 4=1.5, 5=4.0}, 3={1=3.0, 4=4.0}, 4={1=1.5, 2=1.5, 3=4.0, 5=5.0}, 5={2=4.0, 4=5.0, 6=1.7976931348623157E308}, 6={5=1.7976931348623157E308}}", mapFromStore.toString());
    }

    @After
    public void tearDown() throws Exception {
        this.algorithm.close();
    }
}
