package io.kgraph.library.clustering;

import io.kgraph.AbstractIntegrationTest;
import io.kgraph.GraphAlgorithm;
import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.KryoSerializer;
import io.kgraph.utils.StreamUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kgraph/library/clustering/KMeansClusteringTest.class */
public class KMeansClusteringTest extends AbstractIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(KMeansClusteringTest.class);
    GraphAlgorithm<Long, KMeansVertexValue, Long, KTable<Long, KMeansVertexValue>> algorithm;

    @Test
    public void test1() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(1L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(1.0d), Double.valueOf(1.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(2L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(1.5d), Double.valueOf(2.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(3L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(3.0d), Double.valueOf(4.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(4L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(5.0d), Double.valueOf(7.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(5L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(3.5d), Double.valueOf(5.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(6L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(4.5d), Double.valueOf(5.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(7L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(3.5d), Double.valueOf(4.5d)}).collect(Collectors.toList()), 0)));
        KTable tableFromCollection = StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, KryoSerializer.class, new Properties()), "vertices-1", 2, (short) 1, Serdes.Long(), new KryoSerde(), arrayList);
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, LongSerializer.class, new Properties());
        KTable tableFromCollection2 = StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), Collections.emptyList());
        ClientUtils.createTopic("edgesGroupedBySource-1", 2, (short) 1, producerConfig);
        KGraph kGraph = new KGraph(tableFromCollection, tableFromCollection2, GraphSerialized.with(Serdes.Long(), new KryoSerde(), Serdes.Long()));
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList();
        List list = (List) Stream.of((Object[]) new Double[]{Double.valueOf(1.0d), Double.valueOf(1.0d)}).collect(Collectors.toList());
        List list2 = (List) Stream.of((Object[]) new Double[]{Double.valueOf(1.5d), Double.valueOf(2.0d)}).collect(Collectors.toList());
        arrayList2.add(list);
        arrayList2.add(list2);
        hashMap.put("test.initial.centers", arrayList2);
        hashMap.put("kmeans.cluster.centers.count", 2);
        hashMap.put("kmeans.points.dimensions", 2);
        hashMap.put("kmeans.points.count", 7);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-1", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-1", "edgesGroupedBySource-1", kGraph.serialized(), "solutionSet-1", "solutionSetStore-1", "workSet-1", 2, (short) 1, hashMap, Optional.empty(), new KMeansClustering());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-1", "run-client-1", CLUSTER.bootstrapServers(), kGraph.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-1");
        log.debug("result: {}", mapFromStore);
        Assert.assertEquals("{1=0, 2=0, 3=1, 4=1, 5=1, 6=1, 7=1}", mapFromStore.toString());
    }

    @Test
    public void test2() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(1L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(2.0d), Double.valueOf(10.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(2L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(2.0d), Double.valueOf(5.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(3L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(8.0d), Double.valueOf(4.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(4L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(5.0d), Double.valueOf(8.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(5L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(7.0d), Double.valueOf(5.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(6L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(6.0d), Double.valueOf(4.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(7L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(1.0d), Double.valueOf(2.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(8L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(4.0d), Double.valueOf(9.0d)}).collect(Collectors.toList()), 0)));
        KTable tableFromCollection = StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, KryoSerializer.class, new Properties()), "vertices-2", 2, (short) 1, Serdes.Long(), new KryoSerde(), arrayList);
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, LongSerializer.class, new Properties());
        KTable tableFromCollection2 = StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), Collections.emptyList());
        ClientUtils.createTopic("edgesGroupedBySource-2", 2, (short) 1, producerConfig);
        KGraph kGraph = new KGraph(tableFromCollection, tableFromCollection2, GraphSerialized.with(Serdes.Long(), new KryoSerde(), Serdes.Long()));
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList();
        List list = (List) Stream.of((Object[]) new Double[]{Double.valueOf(2.0d), Double.valueOf(10.0d)}).collect(Collectors.toList());
        List list2 = (List) Stream.of((Object[]) new Double[]{Double.valueOf(2.0d), Double.valueOf(5.0d)}).collect(Collectors.toList());
        List list3 = (List) Stream.of((Object[]) new Double[]{Double.valueOf(8.0d), Double.valueOf(4.0d)}).collect(Collectors.toList());
        arrayList2.add(list);
        arrayList2.add(list2);
        arrayList2.add(list3);
        hashMap.put("test.initial.centers", arrayList2);
        hashMap.put("kmeans.cluster.centers.count", 3);
        hashMap.put("kmeans.points.dimensions", 2);
        hashMap.put("kmeans.points.count", 8);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-2", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-2", "edgesGroupedBySource-2", kGraph.serialized(), "solutionSet-2", "solutionSetStore-2", "workSet-2", 2, (short) 1, hashMap, Optional.empty(), new KMeansClustering());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-2", "run-client-2", CLUSTER.bootstrapServers(), kGraph.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-2");
        log.debug("result: {}", mapFromStore);
        Assert.assertEquals("{1=0, 2=1, 3=2, 4=0, 5=2, 6=2, 7=1, 8=0}", mapFromStore.toString());
    }

    @Test
    public void test3() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(1L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-4.31568d), Double.valueOf(-0.396959d), Double.valueOf(-6.29507d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(2L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-4.56112d), Double.valueOf(-1.74917d), Double.valueOf(-4.57874d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(3L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(4.54508d), Double.valueOf(0.102845d), Double.valueOf(6.35385d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(4L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(4.87746d), Double.valueOf(-0.832591d), Double.valueOf(7.06942d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(5L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-5.91254d), Double.valueOf(-0.278006d), Double.valueOf(-4.25934d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(6L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(6.95139d), Double.valueOf(0.120139d), Double.valueOf(4.89531d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(7L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-6.28538d), Double.valueOf(-0.88527d), Double.valueOf(-4.74988d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(8L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-6.84791d), Double.valueOf(0.887664d), Double.valueOf(-4.91919d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(9L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(7.47117d), Double.valueOf(1.67911d), Double.valueOf(6.02221d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(10L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-4.78011d), Double.valueOf(1.2099d), Double.valueOf(-4.55519d)}).collect(Collectors.toList()), 0)));
        KTable tableFromCollection = StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, KryoSerializer.class, new Properties()), "vertices-3", 2, (short) 1, Serdes.Long(), new KryoSerde(), arrayList);
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, LongSerializer.class, new Properties());
        KTable tableFromCollection2 = StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), Collections.emptyList());
        ClientUtils.createTopic("edgesGroupedBySource-3", 2, (short) 1, producerConfig);
        KGraph kGraph = new KGraph(tableFromCollection, tableFromCollection2, GraphSerialized.with(Serdes.Long(), new KryoSerde(), Serdes.Long()));
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList();
        List list = (List) Stream.of((Object[]) new Double[]{Double.valueOf(-4.31568d), Double.valueOf(-0.396959d), Double.valueOf(-6.29507d)}).collect(Collectors.toList());
        List list2 = (List) Stream.of((Object[]) new Double[]{Double.valueOf(-4.56112d), Double.valueOf(-1.74917d), Double.valueOf(-4.57874d)}).collect(Collectors.toList());
        arrayList2.add(list);
        arrayList2.add(list2);
        hashMap.put("test.initial.centers", arrayList2);
        hashMap.put("kmeans.cluster.centers.count", 2);
        hashMap.put("kmeans.points.dimensions", 3);
        hashMap.put("kmeans.points.count", 10);
        hashMap.put("kmeans.print.final.centers", true);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-3", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-3", "edgesGroupedBySource-3", kGraph.serialized(), "solutionSet-3", "solutionSetStore-3", "workSet-3", 2, (short) 1, hashMap, Optional.empty(), new KMeansClustering());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-3", "run-client-3", CLUSTER.bootstrapServers(), kGraph.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-3");
        log.debug("result: {}", mapFromStore);
        Assert.assertEquals("{1=0, 2=0, 3=1, 4=1, 5=0, 6=1, 7=0, 8=0, 9=1, 10=0}", mapFromStore.toString());
    }

    @Test
    public void test4() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(1L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-3.78d), Double.valueOf(-42.01d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(2L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-45.96d), Double.valueOf(30.67d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(3L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(56.37d), Double.valueOf(-46.62d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(4L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(8.78d), Double.valueOf(-37.95d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(5L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-26.95d), Double.valueOf(43.1d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(6L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(37.87d), Double.valueOf(-51.3d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(7L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-2.61d), Double.valueOf(-30.43d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(8L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-23.33d), Double.valueOf(26.23d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(9L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(38.19d), Double.valueOf(-36.27d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(10L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-13.63d), Double.valueOf(-42.26d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(11L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-36.57d), Double.valueOf(32.63d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(12L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(50.65d), Double.valueOf(-52.4d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(13L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-5.76d), Double.valueOf(-51.83d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(14L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-34.43d), Double.valueOf(42.66d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(15L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(40.35d), Double.valueOf(-47.14d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(16L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-23.4d), Double.valueOf(-48.7d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(17L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-29.58d), Double.valueOf(17.77d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(18L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(43.08d), Double.valueOf(-61.96d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(19L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(9.06d), Double.valueOf(-49.26d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(20L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-20.13d), Double.valueOf(44.16d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(21L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(41.62d), Double.valueOf(-45.84d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(22L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(5.23d), Double.valueOf(-41.2d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(23L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-23.0d), Double.valueOf(38.15d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(24L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(44.55d), Double.valueOf(-51.5d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(25L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-15.63d), Double.valueOf(-26.81d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(26L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-24.33d), Double.valueOf(22.63d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(27L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(52.51d), Double.valueOf(-54.75d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(28L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-0.04d), Double.valueOf(-39.69d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(29L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-32.92d), Double.valueOf(43.87d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(30L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(47.99d), Double.valueOf(-36.93d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(31L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-7.34d), Double.valueOf(-57.9d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(32L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-36.17d), Double.valueOf(34.74d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(33L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(51.52d), Double.valueOf(-41.83d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(34L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-21.91d), Double.valueOf(-49.01d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(35L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-46.68d), Double.valueOf(46.04d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(36L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(48.52d), Double.valueOf(-43.67d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(37L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-0.2d), Double.valueOf(-36.62d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(38L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-27.71d), Double.valueOf(35.12d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(39L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(41.29d), Double.valueOf(-42.0d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(40L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-9.17d), Double.valueOf(-43.28d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(41L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-41.16d), Double.valueOf(50.66d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(42L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(49.63d), Double.valueOf(-45.28d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(43L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-8.1d), Double.valueOf(-29.83d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(44L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-49.38d), Double.valueOf(38.57d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(45L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(35.38d), Double.valueOf(-34.9d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(46L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-6.51d), Double.valueOf(-55.58d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(47L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-38.17d), Double.valueOf(40.21d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(48L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(47.47d), Double.valueOf(-45.95d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(49L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-17.66d), Double.valueOf(-51.12d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(50L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-32.6d), Double.valueOf(41.13d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(51L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(40.68d), Double.valueOf(-49.1d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(52L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-10.31d), Double.valueOf(-40.69d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(53L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-22.05d), Double.valueOf(42.91d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(54L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(51.16d), Double.valueOf(-47.58d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(55L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-12.42d), Double.valueOf(-57.29d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(56L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-17.72d), Double.valueOf(39.9d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(57L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(44.57d), Double.valueOf(-41.75d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(58L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(3.14d), Double.valueOf(-35.46d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(59L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-53.73d), Double.valueOf(32.84d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(60L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(53.16d), Double.valueOf(-50.16d)}).collect(Collectors.toList()), 0)));
        KTable tableFromCollection = StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, KryoSerializer.class, new Properties()), "vertices-4", 2, (short) 1, Serdes.Long(), new KryoSerde(), arrayList);
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, LongSerializer.class, new Properties());
        KTable tableFromCollection2 = StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), Collections.emptyList());
        ClientUtils.createTopic("edgesGroupedBySource-4", 2, (short) 1, producerConfig);
        KGraph kGraph = new KGraph(tableFromCollection, tableFromCollection2, GraphSerialized.with(Serdes.Long(), new KryoSerde(), Serdes.Long()));
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList();
        List list = (List) Stream.of((Object[]) new Double[]{Double.valueOf(-3.78d), Double.valueOf(-42.01d)}).collect(Collectors.toList());
        List list2 = (List) Stream.of((Object[]) new Double[]{Double.valueOf(-36.57d), Double.valueOf(32.63d)}).collect(Collectors.toList());
        List list3 = (List) Stream.of((Object[]) new Double[]{Double.valueOf(50.65d), Double.valueOf(-52.4d)}).collect(Collectors.toList());
        arrayList2.add(list);
        arrayList2.add(list2);
        arrayList2.add(list3);
        hashMap.put("test.initial.centers", arrayList2);
        hashMap.put("kmeans.cluster.centers.count", 3);
        hashMap.put("kmeans.points.dimensions", 2);
        hashMap.put("kmeans.points.count", 60);
        hashMap.put("kmeans.print.final.centers", true);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-4", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-4", "edgesGroupedBySource-4", kGraph.serialized(), "solutionSet-4", "solutionSetStore-4", "workSet-4", 2, (short) 1, hashMap, Optional.empty(), new KMeansClustering());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-4", "run-client-4", CLUSTER.bootstrapServers(), kGraph.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-4");
        log.debug("result: {}", mapFromStore);
        Assert.assertEquals("{1=0, 2=1, 3=2, 4=0, 5=1, 6=2, 7=0, 8=1, 9=2, 10=0, 11=1, 12=2, 13=0, 14=1, 15=2, 16=0, 17=1, 18=2, 19=0, 20=1, 21=2, 22=0, 23=1, 24=2, 25=0, 26=1, 27=2, 28=0, 29=1, 30=2, 31=0, 32=1, 33=2, 34=0, 35=1, 36=2, 37=0, 38=1, 39=2, 40=0, 41=1, 42=2, 43=0, 44=1, 45=2, 46=0, 47=1, 48=2, 49=0, 50=1, 51=2, 52=0, 53=1, 54=2, 55=0, 56=1, 57=2, 58=0, 59=1, 60=2}", mapFromStore.toString());
    }

    @Test
    public void test5() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(1L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-3.78d), Double.valueOf(-42.01d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(2L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-45.96d), Double.valueOf(30.67d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(3L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(56.37d), Double.valueOf(-46.62d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(4L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(8.78d), Double.valueOf(-37.95d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(5L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-26.95d), Double.valueOf(43.1d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(6L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(37.87d), Double.valueOf(-51.3d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(7L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-2.61d), Double.valueOf(-30.43d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(8L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-23.33d), Double.valueOf(26.23d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(9L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(38.19d), Double.valueOf(-36.27d)}).collect(Collectors.toList()), 0)));
        arrayList.add(new KeyValue(10L, new KMeansVertexValue((List) Stream.of((Object[]) new Double[]{Double.valueOf(-13.63d), Double.valueOf(-42.26d)}).collect(Collectors.toList()), 0)));
        KTable tableFromCollection = StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, KryoSerializer.class, new Properties()), "vertices-5", 2, (short) 1, Serdes.Long(), new KryoSerde(), arrayList);
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, LongSerializer.class, new Properties());
        KTable tableFromCollection2 = StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), Collections.emptyList());
        ClientUtils.createTopic("edgesGroupedBySource-5", 2, (short) 1, producerConfig);
        KGraph kGraph = new KGraph(tableFromCollection, tableFromCollection2, GraphSerialized.with(Serdes.Long(), new KryoSerde(), Serdes.Long()));
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList();
        List list = (List) Stream.of((Object[]) new Double[]{Double.valueOf(-3.78d), Double.valueOf(-42.01d)}).collect(Collectors.toList());
        List list2 = (List) Stream.of((Object[]) new Double[]{Double.valueOf(-26.95d), Double.valueOf(43.1d)}).collect(Collectors.toList());
        List list3 = (List) Stream.of((Object[]) new Double[]{Double.valueOf(56.37d), Double.valueOf(-46.62d)}).collect(Collectors.toList());
        arrayList2.add(list);
        arrayList2.add(list2);
        arrayList2.add(list3);
        hashMap.put("test.initial.centers", arrayList2);
        hashMap.put("kmeans.cluster.centers.count", 3);
        hashMap.put("kmeans.points.dimensions", 2);
        hashMap.put("kmeans.points.count", 10);
        hashMap.put("kmeans.print.final.centers", true);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-5", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-5", "edgesGroupedBySource-5", kGraph.serialized(), "solutionSet-5", "solutionSetStore-5", "workSet-5", 2, (short) 1, hashMap, Optional.empty(), new KMeansClustering());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-5", "run-client-5", CLUSTER.bootstrapServers(), kGraph.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Map mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-5");
        log.debug("result: {}", mapFromStore);
        Assert.assertEquals("{1=0, 2=1, 3=2, 4=0, 5=1, 6=2, 7=0, 8=1, 9=2, 10=0}", mapFromStore.toString());
    }

    @After
    public void tearDown() throws Exception {
        this.algorithm.close();
    }
}
