package io.kgraph.pregel.aggregators;

import io.kgraph.AbstractIntegrationTest;
import io.kgraph.GraphAlgorithm;
import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import io.kgraph.TestGraphUtils;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.KryoSerializer;
import io.kgraph.utils.StreamUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kgraph/pregel/aggregators/AggregatorTest.class */
public class AggregatorTest extends AbstractIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(AggregatorTest.class);
    GraphAlgorithm<Long, Long, Long, KTable<Long, Long>> algorithm;

    /* loaded from: input_file:io/kgraph/pregel/aggregators/AggregatorTest$InitVertices.class */
    private static final class InitVertices implements ValueMapper<Long, Long> {
        private InitVertices() {
        }

        public Long apply(Long l) {
            return 0L;
        }
    }

    @Test
    public void testVertexCountToValue() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGraph fromEdges = KGraph.fromEdges(StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties()), new KryoSerde(), Serdes.Long(), TestGraphUtils.getTwoChains()), new InitVertices(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-", "edgesGroupedBySource-", (Map) GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-", "prepare-client-", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-", "edgesGroupedBySource-", 2, (short) 1).get(), fromEdges.serialized(), "solutionSet-", "solutionSetStore-", "workSet-", 2, (short) 1, Collections.emptyMap(), Optional.empty(), new VertexCountToValue());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-", "run-client-", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        NavigableMap mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-");
        log.debug("result: {}", mapFromStore);
        HashMap hashMap = new HashMap();
        hashMap.put(0L, 220021L);
        hashMap.put(1L, 220021L);
        hashMap.put(2L, 220021L);
        hashMap.put(3L, 220021L);
        hashMap.put(4L, 220021L);
        hashMap.put(5L, 220021L);
        hashMap.put(6L, 220021L);
        hashMap.put(7L, 220021L);
        hashMap.put(8L, 220021L);
        hashMap.put(9L, 220021L);
        hashMap.put(10L, 220021L);
        hashMap.put(11L, 220021L);
        hashMap.put(12L, 220021L);
        hashMap.put(13L, 220021L);
        hashMap.put(14L, 220021L);
        hashMap.put(15L, 220021L);
        hashMap.put(16L, 220021L);
        hashMap.put(17L, 220021L);
        hashMap.put(18L, 220021L);
        hashMap.put(19L, 220021L);
        hashMap.put(20L, 220021L);
        Assert.assertEquals(hashMap, mapFromStore);
    }

    @Test
    public void testVertexCountToValueFromFile() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        GraphUtils.edgesToTopic(GraphUtils.class.getResourceAsStream("/ratings.txt"), Long::parseLong, Long::parseLong, str -> {
            return 0L;
        }, new LongSerializer(), ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, FloatSerializer.class, new Properties()), "initEdges-vertexFile", 10, (short) 1);
        KGraph fromEdges = KGraph.fromEdges(streamsBuilder.table("initEdges-vertexFile", Consumed.with(new KryoSerde(), Serdes.Long()), Materialized.with(new KryoSerde(), Serdes.Long())), new InitVertices(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
        Map map = (Map) GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-vertexFile", "prepare-client-vertexFile", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-vertexFile", "edgesGroupedBySource-vertexFile", 10, (short) 1).get();
        Thread.sleep(2000L);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-vertexFile", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-vertexFile", "edgesGroupedBySource-vertexFile", map, fromEdges.serialized(), "solutionSet-vertexFile", "solutionSetStore-vertexFile", "workSet-vertexFile", 10, (short) 1, Collections.emptyMap(), Optional.empty(), new VertexCountToValue());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-vertexFile", "run-client-vertexFile", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        NavigableMap mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-vertexFile");
        log.info("result: {}", mapFromStore);
        Assert.assertEquals(1102071L, ((Long) mapFromStore.values().iterator().next()).longValue());
    }

    @Test
    public void testEdgeCountToValueFromFile() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        GraphUtils.edgesToTopic(GraphUtils.class.getResourceAsStream("/ratings.txt"), Long::parseLong, Long::parseLong, str -> {
            return 0L;
        }, new LongSerializer(), ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, FloatSerializer.class, new Properties()), "initEdges-edgeFile", 10, (short) 1);
        KGraph fromEdges = KGraph.fromEdges(streamsBuilder.table("initEdges-edgeFile", Consumed.with(new KryoSerde(), Serdes.Long()), Materialized.with(new KryoSerde(), Serdes.Long())), new InitVertices(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()));
        Map map = (Map) GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-edgeFile", "prepare-client-edgeFile", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-edgeFile", "edgesGroupedBySource-edgeFile", 10, (short) 1).get();
        Thread.sleep(2000L);
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-edgeFile", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-edgeFile", "edgesGroupedBySource-edgeFile", map, fromEdges.serialized(), "solutionSet-edgeFile", "solutionSetStore-edgeFile", "workSet-edgeFile", 10, (short) 1, Collections.emptyMap(), Optional.empty(), new EdgeCountToValue());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-edgeFile", "run-client-edgeFile", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        NavigableMap mapFromStore = StreamUtils.mapFromStore(run.streams(), "solutionSetStore-edgeFile");
        log.info("result: {}", mapFromStore);
        Assert.assertEquals(35494L, ((Long) mapFromStore.values().iterator().next()).longValue());
    }

    @After
    public void tearDown() throws Exception {
        this.algorithm.close();
    }
}
