package io.kgraph.library.cf;

import io.kgraph.AbstractIntegrationTest;
import io.kgraph.Edge;
import io.kgraph.GraphAlgorithm;
import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import io.kgraph.library.cf.Svdpp;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.KryoSerializer;
import io.kgraph.utils.StreamUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeSet;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.jblas.FloatMatrix;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kgraph/library/cf/SvdppTest.class */
public class SvdppTest extends AbstractIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(SvdppTest.class);
    GraphAlgorithm<CfLongId, Svdpp.SvdppValue, Float, KTable<CfLongId, Svdpp.SvdppValue>> algorithm;

    /* loaded from: input_file:io/kgraph/library/cf/SvdppTest$InitVertices.class */
    private static final class InitVertices implements ValueMapper<CfLongId, Svdpp.SvdppValue> {
        private InitVertices() {
        }

        public Svdpp.SvdppValue apply(CfLongId cfLongId) {
            return new Svdpp.SvdppValue(0.0f, new FloatMatrix(), new FloatMatrix());
        }
    }

    @Test
    public void testSvdpp() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Edge(new CfLongId((byte) 0, 1L), new CfLongId((byte) 1, 1L)), Float.valueOf(1.0f)));
        arrayList.add(new KeyValue(new Edge(new CfLongId((byte) 0, 1L), new CfLongId((byte) 1, 2L)), Float.valueOf(2.0f)));
        arrayList.add(new KeyValue(new Edge(new CfLongId((byte) 0, 2L), new CfLongId((byte) 1, 1L)), Float.valueOf(3.0f)));
        arrayList.add(new KeyValue(new Edge(new CfLongId((byte) 0, 2L), new CfLongId((byte) 1, 2L)), Float.valueOf(4.0f)));
        KGraph fromEdges = KGraph.fromEdges(StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, FloatSerializer.class, new Properties()), new KryoSerde(), Serdes.Float(), arrayList), new InitVertices(), GraphSerialized.with(new KryoSerde(), new KryoSerde(), Serdes.Float()));
        Map map = (Map) GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-", "prepare-client-", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-", "edgesGroupedBySource-", 2, (short) 1).get();
        HashMap hashMap = new HashMap();
        hashMap.put("lambda.bias", Float.valueOf(0.005f));
        hashMap.put("gamma.bias", Float.valueOf(0.01f));
        hashMap.put("lambda.factor", Float.valueOf(0.005f));
        hashMap.put("gamma.factor", Float.valueOf(0.01f));
        hashMap.put("min.rating", Float.valueOf(0.0f));
        hashMap.put("max.rating", Float.valueOf(5.0f));
        hashMap.put("dim", 2);
        hashMap.put("iterations", 6);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-", "edgesGroupedBySource-", map, fromEdges.serialized(), "solutionSet-", "solutionSetStore-", "workSet-", 2, (short) 1, hashMap, Optional.empty(), new Svdpp());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-", "run-client-", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        NavigableMap mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-");
        log.info("result: {}", mapFromStore);
        Thread.sleep(2000L);
        Assert.assertEquals("{1 0=[0.007494, 0.008374], 2 0=[0.006907, 0.008184], 1 1=[0.007407, 0.002487], 2 1=[0.006642, 0.001807]}", mapFromStore.toString());
    }

    public void testSvdppFromFile() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        GraphUtils.edgesToTopic(GraphUtils.class.getResourceAsStream("/ratings.txt"), str -> {
            return new CfLongId((byte) 0, Long.parseLong(str));
        }, str2 -> {
            return new CfLongId((byte) 1, Long.parseLong(str2));
        }, Float::parseFloat, new FloatSerializer(), ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, FloatSerializer.class, new Properties()), "initEdges-file", 50, (short) 1);
        KGraph fromEdges = KGraph.fromEdges(streamsBuilder.table("initEdges-file", Consumed.with(new KryoSerde(), Serdes.Float()), Materialized.with(new KryoSerde(), Serdes.Float())), new InitVertices(), GraphSerialized.with(new KryoSerde(), new KryoSerde(), Serdes.Float()));
        Map map = (Map) GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-file", "prepare-client-file", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-file", "edgesGroupedBySource-file", 50, (short) 1).get();
        Thread.sleep(10000L);
        HashMap hashMap = new HashMap();
        hashMap.put("lambda.bias", Float.valueOf(0.005f));
        hashMap.put("gamma.bias", Float.valueOf(0.01f));
        hashMap.put("lambda.factor", Float.valueOf(0.005f));
        hashMap.put("gamma.factor", Float.valueOf(0.01f));
        hashMap.put("min.rating", Float.valueOf(0.0f));
        hashMap.put("max.rating", Float.valueOf(5.0f));
        hashMap.put("dim", 2);
        hashMap.put("iterations", 3);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-file", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-file", "edgesGroupedBySource-file", map, fromEdges.serialized(), "solutionSet-file", "solutionSetStore-file", "workSet-file", 50, (short) 1, hashMap, Optional.empty(), new Svdpp());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-file", "run-client-file", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        NavigableMap mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-file");
        TreeSet treeSet = new TreeSet();
        for (Map.Entry entry : mapFromStore.entrySet()) {
            treeSet.add(((CfLongId) entry.getKey()).toString() + " " + ((Svdpp.SvdppValue) entry.getValue()).toString());
        }
        log.info("result: {}", treeSet);
        log.info("first: {}", mapFromStore.firstEntry());
        log.info("last: {}", mapFromStore.lastEntry());
        Thread.sleep(2000L);
        Assert.assertEquals("1 0=[0.006352, 0.007996]", mapFromStore.firstEntry().toString());
        Assert.assertEquals("2071 1=[0.007310, 0.002405]", mapFromStore.lastEntry().toString());
    }

    @After
    public void tearDown() throws Exception {
        this.algorithm.close();
    }
}
