package io.kgraph.library.maxbmatching;

import io.kgraph.AbstractIntegrationTest;
import io.kgraph.Edge;
import io.kgraph.GraphAlgorithm;
import io.kgraph.GraphAlgorithmState;
import io.kgraph.GraphSerialized;
import io.kgraph.KGraph;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.KryoSerializer;
import io.kgraph.utils.StreamUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kgraph/library/maxbmatching/MaxBMatchingTest.class */
public class MaxBMatchingTest extends AbstractIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(MaxBMatchingTest.class);
    GraphAlgorithm<Long, Integer, MBMEdgeValue, KTable<Long, Integer>> algorithm;

    /* loaded from: input_file:io/kgraph/library/maxbmatching/MaxBMatchingTest$InitVertices.class */
    private static final class InitVertices implements ValueMapper<Long, Integer> {
        private InitVertices() {
        }

        public Integer apply(Long l) {
            switch (l.intValue()) {
                case 1:
                case 3:
                case 4:
                    return 1;
                case 2:
                    return 2;
                case 5:
                    return 3;
                default:
                    return 0;
            }
        }
    }

    @Test
    public void testMaxBMatching() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Edge(1L, 2L), new MBMEdgeValue(3.0d)));
        arrayList.add(new KeyValue(new Edge(1L, 3L), new MBMEdgeValue(1.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 1L), new MBMEdgeValue(3.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 4L), new MBMEdgeValue(1.0d)));
        arrayList.add(new KeyValue(new Edge(2L, 5L), new MBMEdgeValue(1.0d)));
        arrayList.add(new KeyValue(new Edge(3L, 1L), new MBMEdgeValue(1.0d)));
        arrayList.add(new KeyValue(new Edge(3L, 5L), new MBMEdgeValue(3.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 2L), new MBMEdgeValue(1.0d)));
        arrayList.add(new KeyValue(new Edge(4L, 5L), new MBMEdgeValue(2.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 2L), new MBMEdgeValue(1.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 3L), new MBMEdgeValue(3.0d)));
        arrayList.add(new KeyValue(new Edge(5L, 4L), new MBMEdgeValue(2.0d)));
        KGraph fromEdges = KGraph.fromEdges(StreamUtils.tableFromCollection(streamsBuilder, ClientUtils.producerConfig(CLUSTER.bootstrapServers(), KryoSerializer.class, KryoSerializer.class, new Properties()), new KryoSerde(), new KryoSerde(), arrayList), new InitVertices(), GraphSerialized.with(Serdes.Long(), Serdes.Integer(), new KryoSerde()));
        this.algorithm = new PregelGraphAlgorithm((String) null, "run-", CLUSTER.bootstrapServers(), CLUSTER.zKConnectString(), "vertices-", "edgesGroupedBySource-", (Map) GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, ClientUtils.streamsConfig("prepare-", "prepare-client-", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), fromEdges.vertexValueSerde().getClass()), fromEdges, "vertices-", "edgesGroupedBySource-", 2, (short) 1).get(), fromEdges.serialized(), "solutionSet-", "solutionSetStore-", "workSet-", 2, (short) 1, Collections.emptyMap(), Optional.empty(), new MaxBMatching());
        this.streamsConfiguration = ClientUtils.streamsConfig("run-", "run-client-", CLUSTER.bootstrapServers(), fromEdges.keySerde().getClass(), KryoSerde.class);
        this.algorithm.configure(new StreamsBuilder(), this.streamsConfiguration).streams();
        GraphAlgorithmState run = this.algorithm.run();
        run.result().get();
        Thread.sleep(2000L);
        NavigableMap mapFromStore = StreamUtils.mapFromStore(run.streams(), "edgesStore-run-");
        log.debug("result: {}", mapFromStore);
        Assert.assertEquals("{1={2=3.0\tINCLUDED}, 2={1=3.0\tINCLUDED, 5=1.0\tINCLUDED}, 3={5=3.0\tINCLUDED}, 4={5=2.0\tINCLUDED}, 5={2=1.0\tINCLUDED, 3=3.0\tINCLUDED, 4=2.0\tINCLUDED}}", mapFromStore.toString());
    }

    @After
    public void tearDown() throws Exception {
        this.algorithm.close();
    }
}
