package io.kgraph;

import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.KryoSerde;
import io.kgraph.utils.StreamUtils;
import io.kgraph.utils.TestUtils;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Reducer;
import org.junit.Test;

/* loaded from: input_file:io/kgraph/ReduceOnEdgesMethodsITCase.class */
public class ReduceOnEdgesMethodsITCase extends AbstractIntegrationTest {
    private String expectedResult;

    /* loaded from: input_file:io/kgraph/ReduceOnEdgesMethodsITCase$SelectMinWeightNeighbor.class */
    private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Long> {
        private SelectMinWeightNeighbor() {
        }

        public Long iterateEdges(Long l, Iterable<EdgeWithValue<Long, Long>> iterable) {
            long j = Long.MAX_VALUE;
            long j2 = 0;
            if (iterable != null) {
                for (EdgeWithValue<Long, Long> edgeWithValue : iterable) {
                    if (((Long) edgeWithValue.value()).longValue() < j) {
                        j = ((Long) edgeWithValue.value()).longValue();
                        j2 = ((Long) edgeWithValue.target()).longValue();
                    }
                }
            }
            return Long.valueOf(j2);
        }

        public /* bridge */ /* synthetic */ Object iterateEdges(Object obj, Iterable iterable) {
            return iterateEdges((Long) obj, (Iterable<EdgeWithValue<Long, Long>>) iterable);
        }
    }

    /* loaded from: input_file:io/kgraph/ReduceOnEdgesMethodsITCase$SelectMinWeightNeighborNoValue.class */
    private static final class SelectMinWeightNeighborNoValue implements Reducer<Long> {
        private SelectMinWeightNeighborNoValue() {
        }

        public Long apply(Long l, Long l2) {
            return Long.valueOf(Math.min(l.longValue(), l2.longValue()));
        }
    }

    @Test
    public void testLowestWeightOutNeighbor() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable groupReduceOnEdges = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, groupReduceOnEdges);
        this.expectedResult = "1,2\n2,3\n3,4\n4,5\n5,1\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }

    @Test
    public void testLowestWeightOutNeighborNoValue() throws Exception {
        Properties producerConfig = ClientUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class, new Properties());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable reduceOnEdges = new KGraph(StreamUtils.tableFromCollection(streamsBuilder, producerConfig, Serdes.Long(), Serdes.Long(), TestGraphUtils.getLongLongVertices()), StreamUtils.tableFromCollection(streamsBuilder, producerConfig, new KryoSerde(), Serdes.Long(), TestGraphUtils.getLongLongEdges()), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long())).reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
        startStreams(streamsBuilder, Serdes.Long(), Serdes.Long());
        Thread.sleep(5000L);
        List listFromTable = StreamUtils.listFromTable(this.streams, reduceOnEdges);
        this.expectedResult = "1,12\n2,23\n3,34\n4,45\n5,51\n";
        TestUtils.compareResultAsTuples(listFromTable, this.expectedResult);
    }
}
