package io.kgraph.rest.server.graph;

import io.kgraph.GraphSerialized;
import io.kgraph.library.BreadthFirstSearch;
import io.kgraph.library.ConnectedComponents;
import io.kgraph.library.LabelPropagation;
import io.kgraph.library.LocalClusteringCoefficient;
import io.kgraph.library.MultipleSourceShortestPaths;
import io.kgraph.library.PageRank;
import io.kgraph.library.SingleSourceShortestPaths;
import io.kgraph.pregel.PregelGraphAlgorithm;
import io.kgraph.pregel.ZKUtils;
import io.kgraph.rest.server.KafkaGraphsProperties;
import io.kgraph.tools.importer.GraphImporter;
import io.kgraph.utils.ClientUtils;
import io.kgraph.utils.GraphUtils;
import io.kgraph.utils.KryoSerde;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.nodes.GroupMember;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.reactive.context.ReactiveWebServerInitializedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@EnableConfigurationProperties({KafkaGraphsProperties.class})
@Component
/* loaded from: input_file:BOOT-INF/classes/io/kgraph/rest/server/graph/GraphAlgorithmHandler.class */
public class GraphAlgorithmHandler<EV> implements ApplicationListener<ReactiveWebServerInitializedEvent> {
    private static final String X_KGRAPH_APPID = "X-KGraph-AppId";
    private final KafkaGraphsProperties props;
    private final CuratorFramework curator;
    private int port;
    private GroupMember group;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GraphAlgorithmHandler.class);
    private static final Flux<Long> INTERVAL = Flux.interval(Duration.ofMillis(100), Duration.ofSeconds(2));
    private final ConcurrentMap<String, PregelGraphAlgorithm<Long, ?, ?, ?>> algorithms = new ConcurrentHashMap();
    private final String host = getHostAddress();

    public GraphAlgorithmHandler(KafkaGraphsProperties kafkaGraphsProperties, CuratorFramework curatorFramework) {
        this.props = kafkaGraphsProperties;
        this.curator = curatorFramework;
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ReactiveWebServerInitializedEvent reactiveWebServerInitializedEvent) {
        this.port = reactiveWebServerInitializedEvent.getWebServer().getPort();
        this.group = new GroupMember(this.curator, ZKPaths.makePath(ZKUtils.GRAPHS_PATH, ZKUtils.GROUP), getHostAndPort());
        this.group.start();
    }

    public Mono<ServerResponse> importGraph(ServerRequest serverRequest) {
        return ((Mono) serverRequest.body(BodyExtractors.toMultipartData())).flatMap(multiValueMap -> {
            try {
                Map singleValueMap = multiValueMap.toSingleValueMap();
                FilePart filePart = (FilePart) singleValueMap.get("verticesFile");
                File file = new File(ClientUtils.tempDirectory(), filePart.filename());
                filePart.transferTo(file);
                FilePart filePart2 = (FilePart) singleValueMap.get("edgesFile");
                File file2 = new File(ClientUtils.tempDirectory(), filePart2.filename());
                filePart2.transferTo(file2);
                new GraphImporter(this.props.getBootstrapServers(), file, file2, ((FormFieldPart) singleValueMap.get("verticesTopic")).value(), ((FormFieldPart) singleValueMap.get("edgesTopic")).value(), Boolean.parseBoolean(((FormFieldPart) singleValueMap.get("useDouble")).value()), Integer.parseInt(((FormFieldPart) singleValueMap.get("numPartitions")).value()), Short.parseShort(((FormFieldPart) singleValueMap.get("replicationFactor")).value())).call();
                return ServerResponse.ok().build();
            } catch (NumberFormatException e) {
                throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid number", e);
            } catch (Exception e2) {
                throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR);
            }
        });
    }

    public Mono<ServerResponse> prepareGraph(ServerRequest serverRequest) {
        String generateRandomString = generateRandomString(8);
        return serverRequest.bodyToMono(GroupEdgesBySourceRequest.class).doOnNext(groupEdgesBySourceRequest -> {
            try {
                StreamsBuilder streamsBuilder = new StreamsBuilder();
                Properties streamsConfig = streamsConfig(generateRandomString, this.props.getBootstrapServers(), groupEdgesBySourceRequest.isValuesOfTypeDouble() ? Serdes.Double() : Serdes.Long());
                CompletableFuture<Void> groupEdgesBySourceAndRepartition = groupEdgesBySourceRequest.isValuesOfTypeDouble() ? GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, streamsConfig, groupEdgesBySourceRequest.getInitialVerticesTopic(), groupEdgesBySourceRequest.getInitialEdgesTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Double()), groupEdgesBySourceRequest.getVerticesTopic(), groupEdgesBySourceRequest.getEdgesGroupedBySourceTopic(), groupEdgesBySourceRequest.getNumPartitions(), groupEdgesBySourceRequest.getReplicationFactor()) : GraphUtils.groupEdgesBySourceAndRepartition(streamsBuilder, streamsConfig, groupEdgesBySourceRequest.getInitialVerticesTopic(), groupEdgesBySourceRequest.getInitialEdgesTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()), groupEdgesBySourceRequest.getVerticesTopic(), groupEdgesBySourceRequest.getEdgesGroupedBySourceTopic(), groupEdgesBySourceRequest.getNumPartitions(), groupEdgesBySourceRequest.getReplicationFactor());
                if (!groupEdgesBySourceRequest.isAsync()) {
                    groupEdgesBySourceAndRepartition.get();
                }
            } catch (Exception e) {
                throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR);
            }
        }).then(ServerResponse.ok().build());
    }

    public Mono<ServerResponse> configure(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String generateRandomString = header.isEmpty() ? generateRandomString(8) : header.iterator().next();
        return serverRequest.bodyToMono(GraphAlgorithmCreateRequest.class).doOnNext(graphAlgorithmCreateRequest -> {
            PregelGraphAlgorithm<Long, ?, ?, ?> algorithm = getAlgorithm(generateRandomString, graphAlgorithmCreateRequest);
            algorithm.configure(new StreamsBuilder(), streamsConfig(generateRandomString, this.props.getBootstrapServers(), new KryoSerde()));
            this.algorithms.put(generateRandomString, algorithm);
        }).flatMapMany(graphAlgorithmCreateRequest2 -> {
            return proxyConfigure(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), generateRandomString, graphAlgorithmCreateRequest2);
        }).then(ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((ServerResponse.BodyBuilder) Mono.just(new GraphAlgorithmId(generateRandomString)), GraphAlgorithmId.class));
    }

    private PregelGraphAlgorithm<Long, ?, ?, ?> getAlgorithm(String str, GraphAlgorithmCreateRequest graphAlgorithmCreateRequest) {
        PregelGraphAlgorithm singleSourceShortestPaths;
        try {
            switch (graphAlgorithmCreateRequest.getAlgorithm()) {
                case bfs:
                    long parseLong = Long.parseLong(getParam(graphAlgorithmCreateRequest.getParams(), "srcVertexId", true));
                    if (!graphAlgorithmCreateRequest.isValuesOfTypeDouble()) {
                        singleSourceShortestPaths = new BreadthFirstSearch(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor(), parseLong);
                        break;
                    } else {
                        singleSourceShortestPaths = new BreadthFirstSearch(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Double()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor(), parseLong);
                        break;
                    }
                case wcc:
                    if (!graphAlgorithmCreateRequest.isValuesOfTypeDouble()) {
                        singleSourceShortestPaths = new ConnectedComponents(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor());
                        break;
                    } else {
                        singleSourceShortestPaths = new ConnectedComponents(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Double()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor());
                        break;
                    }
                case lcc:
                    singleSourceShortestPaths = new LocalClusteringCoefficient(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Double(), Serdes.Double()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor());
                    break;
                case lp:
                    long parseLong2 = Long.parseLong(getParam(graphAlgorithmCreateRequest.getParams(), "srcVertexId", true));
                    if (!graphAlgorithmCreateRequest.isValuesOfTypeDouble()) {
                        singleSourceShortestPaths = new LabelPropagation(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Long()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor(), parseLong2);
                        break;
                    } else {
                        singleSourceShortestPaths = new LabelPropagation(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Long(), Serdes.Double()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor(), parseLong2);
                        break;
                    }
                case mssp:
                    singleSourceShortestPaths = new MultipleSourceShortestPaths(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), new KryoSerde(), Serdes.Double()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor(), (Set) Arrays.stream(getParam(graphAlgorithmCreateRequest.getParams(), "landmarkVertexIds", true).split(StringArrayPropertyEditor.DEFAULT_SEPARATOR)).map(Long::parseLong).collect(Collectors.toSet()));
                    break;
                case pagerank:
                    double parseDouble = Double.parseDouble(getParam(graphAlgorithmCreateRequest.getParams(), "tolerance", true));
                    double parseDouble2 = Double.parseDouble(getParam(graphAlgorithmCreateRequest.getParams(), "resetProbability", true));
                    String param = getParam(graphAlgorithmCreateRequest.getParams(), "srcVertexId", false);
                    singleSourceShortestPaths = new PageRank(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), new KryoSerde(), Serdes.Double()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor(), parseDouble, parseDouble2, param != null ? Optional.of(Long.valueOf(Long.parseLong(param))) : Optional.empty());
                    break;
                case sssp:
                    singleSourceShortestPaths = new SingleSourceShortestPaths(getHostAndPort(), str, this.props.getBootstrapServers(), this.curator, graphAlgorithmCreateRequest.getVerticesTopic(), graphAlgorithmCreateRequest.getEdgesGroupedBySourceTopic(), GraphSerialized.with(Serdes.Long(), Serdes.Double(), Serdes.Double()), graphAlgorithmCreateRequest.getNumPartitions(), graphAlgorithmCreateRequest.getReplicationFactor(), Long.parseLong(getParam(graphAlgorithmCreateRequest.getParams(), "srcVertexId", true)));
                    break;
                default:
                    throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid algorithm: " + graphAlgorithmCreateRequest.getAlgorithm());
            }
            return singleSourceShortestPaths;
        } catch (NumberFormatException e) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Invalid number", e);
        }
    }

    private String getParam(Map<String, String> map, String str, boolean z) {
        String str2 = map.get(str);
        if (z && str2 == null) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Missing param: " + str);
        }
        return str2;
    }

    private Flux<GraphAlgorithmId> proxyConfigure(Set<String> set, String str, GraphAlgorithmCreateRequest graphAlgorithmCreateRequest) {
        return Flux.fromIterable(set).filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).flatMap(str3 -> {
            log.debug("proxy configure to {}", str3);
            return ((WebClient.RequestBodySpec) WebClient.create("http://" + str3).post().uri("/pregel", new Object[0])).accept2(MediaType.APPLICATION_JSON).header2(X_KGRAPH_APPID, str).body((WebClient.RequestBodySpec) Mono.just(graphAlgorithmCreateRequest), GraphAlgorithmCreateRequest.class).retrieve().bodyToMono(GraphAlgorithmId.class);
        });
    }

    public Mono<ServerResponse> state(ServerRequest serverRequest) {
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((ServerResponse.BodyBuilder) Mono.just(new GraphAlgorithmStatus(this.algorithms.get(serverRequest.pathVariable("id")).state())), GraphAlgorithmStatus.class);
    }

    public Mono<ServerResponse> run(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String pathVariable = serverRequest.pathVariable("id");
        return serverRequest.bodyToMono(GraphAlgorithmRunRequest.class).flatMapMany(graphAlgorithmRunRequest -> {
            log.debug("num iterations: {}", Integer.valueOf(graphAlgorithmRunRequest.getNumIterations()));
            return Mono.just(new GraphAlgorithmStatus(this.algorithms.get(pathVariable).run(graphAlgorithmRunRequest.getNumIterations()))).mergeWith(proxyRun(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), pathVariable, graphAlgorithmRunRequest));
        }).reduce((graphAlgorithmStatus, graphAlgorithmStatus2) -> {
            return graphAlgorithmStatus;
        }).flatMap(graphAlgorithmStatus3 -> {
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body((ServerResponse.BodyBuilder) Mono.just(graphAlgorithmStatus3), GraphAlgorithmStatus.class);
        });
    }

    private Flux<GraphAlgorithmStatus> proxyRun(Set<String> set, String str, GraphAlgorithmRunRequest graphAlgorithmRunRequest) {
        return Flux.fromIterable(set).filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).flatMap(str3 -> {
            log.debug("proxy run to {}", str3);
            return ((WebClient.RequestBodySpec) WebClient.create("http://" + str3).post().uri("/pregel/" + str, new Object[0])).accept2(MediaType.APPLICATION_JSON).header2(X_KGRAPH_APPID, str).body((WebClient.RequestBodySpec) Mono.just(graphAlgorithmRunRequest), GraphAlgorithmRunRequest.class).retrieve().bodyToMono(GraphAlgorithmStatus.class);
        });
    }

    public Mono<ServerResponse> result(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String pathVariable = serverRequest.pathVariable("id");
        return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(BodyInserters.fromPublisher(proxyResult(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), pathVariable, Flux.fromIterable(this.algorithms.get(pathVariable).result()).map(keyValue -> {
            log.trace("result: ({}, {})", keyValue.key, keyValue.value);
            return keyValue.key + " " + keyValue.value;
        })), String.class));
    }

    private Flux<String> proxyResult(Set<String> set, String str, Flux<String> flux) {
        return (Flux) set.stream().filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).map(str3 -> {
            log.debug("proxy result to {}", str3);
            return WebClient.create("http://" + str3).get().uri("/pregel/" + str + "/result", new Object[0]).accept2(MediaType.TEXT_EVENT_STREAM).header2(X_KGRAPH_APPID, str).retrieve().bodyToFlux(String.class);
        }).reduce(flux, (v0, v1) -> {
            return v0.mergeWith(v1);
        });
    }

    public Mono<ServerResponse> delete(ServerRequest serverRequest) {
        List<String> header = serverRequest.headers().header(X_KGRAPH_APPID);
        String pathVariable = serverRequest.pathVariable("id");
        this.algorithms.remove(pathVariable).close();
        return proxyDelete(header.isEmpty() ? this.group.getCurrentMembers().keySet() : Collections.emptySet(), pathVariable).then(ServerResponse.noContent().build());
    }

    private Flux<Void> proxyDelete(Set<String> set, String str) {
        return Flux.fromIterable(set).filter(str2 -> {
            return !str2.equals(getHostAndPort());
        }).flatMap(str3 -> {
            log.debug("proxy delete to {}", str3);
            return WebClient.create("http://" + str3).delete().uri("/pregel/" + str, new Object[0]).accept2(MediaType.APPLICATION_JSON).header2(X_KGRAPH_APPID, str).retrieve().bodyToMono(Void.class);
        });
    }

    public static Properties streamsConfig(String str, String str2, Serde<?> serde) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, str);
        properties.put("client.id", str + "-client");
        properties.put("bootstrap.servers", str2);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde.getClass().getName());
        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
        properties.put(StreamsConfig.STATE_DIR_CONFIG, ClientUtils.tempDirectory().getAbsolutePath());
        return properties;
    }

    public String getHostAndPort() {
        return this.host + ":" + this.port;
    }

    public String getHostAddress() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

    public int getPort() {
        return this.port;
    }

    public String generateRandomString(int i) {
        Random random = new Random();
        StringBuilder sb = new StringBuilder(i);
        for (int i2 = 0; i2 < i; i2++) {
            sb.append((char) (97 + ((int) (random.nextFloat() * ((122 - 97) + 1)))));
        }
        return sb.toString();
    }
}
